Skip to content

Commit

Permalink
Better job naming strategy for CWL (#51)
Browse files Browse the repository at this point in the history
This commit renames jobs as `step_name`/`job_id`, instead of naming them
with a randomly generated string. This simplifies debugging and log
interpretation. Plus, the workflow persistent id is included in the `Job`
object, allowing the scheduler to take decisions based on the entire workflow
shape.
  • Loading branch information
GlassOfWhiskey authored Jan 24, 2023
1 parent 022261a commit 556654c
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 13 deletions.
3 changes: 3 additions & 0 deletions streamflow/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ async def run(self) -> MutableMapping[str, Any]:
class Job:
__slots__ = (
"name",
"workflow_id",
"inputs",
"input_directory",
"output_directory",
Expand All @@ -137,12 +138,14 @@ class Job:
def __init__(
self,
name: str,
workflow_id: int,
inputs: MutableMapping[str, Token],
input_directory: str,
output_directory: str,
tmp_directory: str,
):
self.name: str = name
self.workflow_id: int = workflow_id
self.inputs: MutableMapping[str, Token] = inputs
self.input_directory: str = input_directory
self.output_directory: str = output_directory
Expand Down
7 changes: 5 additions & 2 deletions streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,8 @@ def _inject_input(
# Create a schedule step and connect it to the local DeployStep
schedule_step = workflow.create_step(
cls=ScheduleStep,
name=posixpath.join(global_name + "-injector", "__schedule__"),
name=posixpath.join(f"{global_name}-injector", "__schedule__"),
job_prefix=f"{global_name}-injector",
connector_ports={target.deployment.name: deploy_step.get_output_port()},
input_directory=target.workdir or output_directory,
output_directory=target.workdir or output_directory,
Expand Down Expand Up @@ -1545,6 +1546,7 @@ def _translate_command_line_tool(
schedule_step = workflow.create_step(
cls=ScheduleStep,
name=posixpath.join(name_prefix, "__schedule__"),
job_prefix=name_prefix,
connector_ports={
name: step.get_output_port() for name, step in deploy_steps.items()
},
Expand Down Expand Up @@ -2554,7 +2556,8 @@ def translate(self) -> Workflow:
# Create a schedule step and connect it to the local DeployStep
schedule_step = workflow.create_step(
cls=ScheduleStep,
name=posixpath.join(port_name + "-collector", "__schedule__"),
name=posixpath.join(f"{port_name}-collector", "__schedule__"),
job_prefix=f"{port_name}-collector",
connector_ports={
target.deployment.name: deploy_step.get_output_port()
},
Expand Down
1 change: 1 addition & 0 deletions streamflow/recovery/failure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async def _do_handle_failure(self, job: Job, step: Step) -> CommandOutput:
self.jobs[job.name] = JobVersion(
job=Job(
name=job.name,
workflow_id=step.workflow.persistent_id,
inputs=dict(job.inputs),
input_directory=job.input_directory,
output_directory=job.output_directory,
Expand Down
12 changes: 9 additions & 3 deletions streamflow/workflow/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
)
from streamflow.core.persistence import DatabaseLoadingContext
from streamflow.core.scheduling import HardwareRequirement
from streamflow.core.utils import random_name
from streamflow.core.workflow import (
Command,
CommandOutput,
Expand Down Expand Up @@ -564,6 +563,7 @@ async def _run_job(
raise WorkflowExecutionException(f"Step {self.name} received a null job")
job = Job(
name=job.name,
workflow_id=self.workflow.persistent_id,
inputs=inputs,
input_directory=job.input_directory,
output_directory=job.output_directory,
Expand Down Expand Up @@ -1107,13 +1107,15 @@ def __init__(
binding_config: BindingConfig,
connector_ports: MutableMapping[str, ConnectorPort],
job_port: JobPort | None = None,
job_prefix: str | None = None,
hardware_requirement: HardwareRequirement | None = None,
input_directory: str | None = None,
output_directory: str | None = None,
tmp_directory: str | None = None,
):
super().__init__(name, workflow)
self.binding_config: BindingConfig = binding_config
self.job_prefix: str = job_prefix or name
self.hardware_requirement: HardwareRequirement | None = hardware_requirement
self.input_directory: str | None = input_directory
self.output_directory: str | None = output_directory
Expand Down Expand Up @@ -1250,7 +1252,10 @@ async def run(self):
inputs = inputs_map.pop(tag)
# Create Job
job = Job(
name=random_name(),
name=posixpath.join(
self.job_prefix, tag.split(".")[-1]
),
workflow_id=self.workflow.persistent_id,
inputs=inputs,
input_directory=self.input_directory,
output_directory=self.output_directory,
Expand All @@ -1274,7 +1279,8 @@ async def run(self):
else:
# Create Job
job = Job(
name=random_name(),
name=posixpath.join(self.job_prefix, "0"),
workflow_id=self.workflow.persistent_id,
inputs={},
input_directory=self.input_directory,
output_directory=self.output_directory,
Expand Down
21 changes: 13 additions & 8 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import pytest
import asyncio

from typing import MutableMapping, MutableSequence, Optional
from tests.conftest import get_docker_deploy_config

import pytest

from streamflow.core import utils
from streamflow.core.workflow import Job, Status
from streamflow.core.config import BindingConfig, Config
from streamflow.core.context import StreamFlowContext
from streamflow.core.config import Config, BindingConfig
from streamflow.core.scheduling import AvailableLocation, Hardware
from streamflow.core.deployment import (
LOCAL_LOCATION,
BindingFilter,
LOCAL_LOCATION,
LocalTarget,
Target,
)

from streamflow.core.scheduling import AvailableLocation, Hardware
from streamflow.core.workflow import Job, Status
from streamflow.deployment.connector import LocalConnector
from tests.conftest import get_docker_deploy_config


class CustomConnector(LocalConnector):
Expand Down Expand Up @@ -87,6 +86,7 @@ async def test_single_env_few_resources(context: StreamFlowContext):
jobs.append(
Job(
name=utils.random_name(),
workflow_id=0,
inputs={},
input_directory=utils.random_name(),
output_directory=utils.random_name(),
Expand Down Expand Up @@ -161,6 +161,7 @@ async def test_single_env_enough_resources(context: StreamFlowContext):
jobs.append(
Job(
name=utils.random_name(),
workflow_id=0,
inputs={},
input_directory=utils.random_name(),
output_directory=utils.random_name(),
Expand Down Expand Up @@ -224,6 +225,7 @@ async def test_multi_env(context: StreamFlowContext):
(
Job(
name=utils.random_name(),
workflow_id=0,
inputs={},
input_directory=utils.random_name(),
output_directory=utils.random_name(),
Expand Down Expand Up @@ -277,6 +279,7 @@ async def test_multi_targets_one_job(context: StreamFlowContext):
# Create fake job with two targets and schedule it
job = Job(
name=utils.random_name(),
workflow_id=0,
inputs={},
input_directory=utils.random_name(),
output_directory=utils.random_name(),
Expand Down Expand Up @@ -343,6 +346,7 @@ async def test_multi_targets_two_jobs(context: StreamFlowContext):
jobs.append(
Job(
name=utils.random_name(),
workflow_id=0,
inputs={},
input_directory=utils.random_name(),
output_directory=utils.random_name(),
Expand Down Expand Up @@ -400,6 +404,7 @@ async def test_binding_filter(context: StreamFlowContext):
"""Test Binding Filter using a job with two targets both free. With the CustomBindingFilter the scheduling will choose the second target"""
job = Job(
name=utils.random_name(),
workflow_id=0,
inputs={},
input_directory=utils.random_name(),
output_directory=utils.random_name(),
Expand Down

0 comments on commit 556654c

Please sign in to comment.