diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index ccb1045a5..2c7e73d27 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -73,6 +73,7 @@ jobs: - name: "Install Docker (MacOS X)" uses: douglascamata/setup-docker-macos-action@v1-alpha.9 if: ${{ startsWith(matrix.on, 'macos-') }} + continue-on-error: true - uses: docker/setup-qemu-action@v2 - name: "Install Apptainer" uses: eWaterCycle/setup-apptainer@v2 @@ -212,6 +213,7 @@ jobs: - name: "Install Docker (MacOs X)" uses: douglascamata/setup-docker-macos-action@v1-alpha.9 if: ${{ startsWith(matrix.on, 'macos-') }} + continue-on-error: true - uses: docker/setup-qemu-action@v2 - name: "Install Apptainer" uses: eWaterCycle/setup-apptainer@v2 diff --git a/README.md b/README.md index a1003bbad..c17461f11 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,8 @@ docker run -d \ --mount type=bind,source="$(pwd)"/my-project,target=/streamflow/project \ --mount type=bind,source="$(pwd)"/results,target=/streamflow/results \ --mount type=bind,source="$(pwd)"/tmp,target=/tmp/streamflow \ - alphaunito/streamflow run /streamflow/project/streamflow.yml + alphaunito/streamflow \ + streamflow run /streamflow/project/streamflow.yml ``` #### Kubernetes diff --git a/docs/source/advanced/multiple-targets.rst b/docs/source/advanced/multiple-targets.rst new file mode 100644 index 000000000..cc81942ce --- /dev/null +++ b/docs/source/advanced/multiple-targets.rst @@ -0,0 +1,27 @@ +================ +Multiple targets +================ + +StreamFlow lets users to map steps and ports to multiple targets in the :ref:`StreamFlow file `. A step bound to multiple locations can be scheduled on each of them at runtime. Plus, if a step encloses multiple instances (e.g., in a CWL ``scatter`` operation), they can run on different targets. + +The `filters` directive defines one or more strategies to select a target among the set of available ones (or among a subset) at runtime. By default, all available targets will be evaluated in the order of appearance in the ``target`` directive. + +Users can select a given :ref:`BindingFilter ` implementation by specifying its name in the ``filters`` directive of a ``binding`` object. If multiple filters are declared, they are applied to the target list in the order of appearance. For example, to evaluate targets in random order at every allocation request, users can specify the following: + +.. code-block:: yaml + + workflows: + example: + type: cwl + config: + file: main.cwl + settings: config.yml + bindings: + - step: /compile + target: + - deployment: first-deployment + - deployment: second-deployment + filters: + - shuffle + +Conversely, a file or directory port bound to multiple locations can be retrieved from each of them at runtime. StreamFlow will always try to minimize the overhead of data transfers, using local data whenever possible. \ No newline at end of file diff --git a/docs/source/advanced/port-targets.rst b/docs/source/advanced/port-targets.rst new file mode 100644 index 000000000..f745324c6 --- /dev/null +++ b/docs/source/advanced/port-targets.rst @@ -0,0 +1,35 @@ +============ +Port targets +============ + +In the default case, when a workflow receives files or folders as initial input objects, StreamFlow looks for them in the local file system. Along the same line, whenever a workflow step produces input files or folders, StreamFlow searches them in the location where the step was executed. + +However, there are cases in which these assumptions are not valid. To correctly handle these cases, the user can specify port targets in the ``bindings`` list of a workflow. Port targets are similar to step targets described :ref:`here `, but bind ports instead of steps. + +In particular, a port binding contains a ``port`` directive referring to a specific input/output port in the workflow, a ``target`` directive referring to a deployment entry in the ``deployments`` section of the StreamFlow file, and a (mandatory) ``workdir`` entry identifies the base path where the data should be placed. + +Similarly to steps, ports are uniquely identified using a Posix-like path, where the port is mapped to a file, and the related step is mapped to a folder. Consider the following example, which refers to :ref:`this ` workflow: + +.. code-block:: yaml + + version: v1.0 + workflows: + extract-and-compile: + type: cwl + config: + file: main.cwl + settings: config.yml + bindings: + - step: /compile/src + target: + deployment: hpc-slurm + workdir: /archive/home/myuser + + deployments: + hpc-slurm: + type: slurm + config: + ... + +Here, the ``/compile/src`` path refers to the ``src`` port of the ``/compile`` step. StreamFlow will search for the file the ``src`` port requires directly on the remote ``hpc-slurm`` location in the ``/archive/home/myuser`` path. + diff --git a/docs/source/advanced/stacked-locations.rst b/docs/source/advanced/stacked-locations.rst new file mode 100644 index 000000000..fedca9897 --- /dev/null +++ b/docs/source/advanced/stacked-locations.rst @@ -0,0 +1,28 @@ +================= +Stacked locations +================= + +StreamFlow supports the concept of stacked locations, adhering to the separation of concerns principle. This allows the user to describe complex execution environments, e.g., a :ref:`Singularity container ` launched by a :ref:`Slurm queue manager ` called through an :ref:`SSH connection `. + +Users can define stacked locations using the ``wraps`` property in the :ref:`StreamFlow file `. For example, consider a remote Slurm queue manager that can be contacted by connecting to the login node of an HPC facility using SSH. This is a typical configuration for HPC systems. Then a user can write: + +.. code-block:: yaml + + deployments: + ssh-hpc: + type: ssh + config: + ... + slurm-hpc: + type: slurm + config: + ... + wraps: ssh-hpc + +.. warning:: + + Note that in StreamFlow ``v0.1``, the queue manager connectors (:ref:`Slurm ` and :ref:`PBS `) are inherited from the :ref:`SSHConnector ` at the implementation level. Consequently, all the properties needed to open an SSH connection to the HPC login node (e.g., ``hostname``, ``username``, and ``sshKey``) were defined directly in the ``config`` section of the queue manager deployment. This path is still supported by StreamFlow ``v0.2``, but it is deprecated and will be removed in StreamFlow ``v0.3``. + +Note that not all deployment types can wrap other locations. Indeed, only connectors extending the :ref:`ConnectorWrapper ` interface support the ``wraps`` directive. Specifying the ``wraps`` directive on a container type that does not support it will result in an error during StreamFlow initialization. Conversely, if no explicit ``wraps`` directive is specified for a :ref:`ConnectorWrapper `, it wraps the :ref:`LocalConnector `. + +The ``wraps`` directive only supports wrapping a single inner location. However, a single location can be wrapped by multiple deployment definitions. The :ref:`DeploymentManager ` component is responsible for guaranteeing the correct order of deployment and undeployment for stacked locations. \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py index 9f2a47766..62e0294e8 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -17,12 +17,13 @@ # -- Project information ----------------------------------------------------- import importlib +import os.path project = 'StreamFlow' -copyright = '2021, Alpha Research Group, Computer Science Dept., University of Torino' +copyright = '2023, Alpha Research Group, Computer Science Dept., University of Torino' author = 'Iacopo Colonnelli' -version = '0.1' -release = '0.1.0' +version = '0.2' +release = '0.2.0' # -- General configuration --------------------------------------------------- @@ -80,6 +81,7 @@ def setup(app): } # JSONSchema extensions +sjs = importlib.import_module("sphinx-jsonschema") sjs_wide_format = importlib.import_module("sphinx-jsonschema.wide_format") @@ -208,3 +210,27 @@ def patched_run(self, schema, pointer=''): original_run = sjs_wide_format.WideFormat.run sjs_wide_format.WideFormat.run = patched_run + + +def patched_get_json_data(self): + schema, source, pointer = original_get_json_data(self) + + if self.arguments: + filename, pointer = self._splitpointer(self.arguments[0]) + else: + filename, pointer = None, '' + + if 'allOf' in schema: + for obj in schema['allOf']: + if '$ref' in obj: + target_file = os.path.join(os.path.dirname(filename), obj['$ref']) + target_schema, _ = self.from_file(target_file) + target_schema = self.ordered_load(target_schema) + schema['properties'] = {**target_schema.get('properties', {}), **schema['properties']} + del schema['allOf'] + schema['properties'] = dict(sorted(schema['properties'].items())) + return schema, source, pointer + + +original_get_json_data = sjs.JsonSchema.get_json_data +sjs.JsonSchema.get_json_data = patched_get_json_data \ No newline at end of file diff --git a/docs/source/connector/docker-compose.rst b/docs/source/connector/docker-compose.rst index 2b9b41913..dd8fdeee1 100644 --- a/docs/source/connector/docker-compose.rst +++ b/docs/source/connector/docker-compose.rst @@ -4,5 +4,5 @@ DockerComposeConnector The `DockerCompose `_ connector can spawn complex, multi-container environments described in a Docker Compose file locally on the StreamFlow node. The entire set of ``services`` in the Docker Compose file contitutes the unit of deployment, while a single service is the unit of binding. Finally, the single instance of a potentially replicated service is the unit of scheduling. -.. jsonschema:: ../../../streamflow/config/schemas/v1.0/docker-compose.json +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/docker-compose.json :lift_description: true diff --git a/docs/source/connector/docker.rst b/docs/source/connector/docker.rst index 76cbc6a15..6606fb376 100644 --- a/docs/source/connector/docker.rst +++ b/docs/source/connector/docker.rst @@ -4,5 +4,5 @@ DockerConnector The `Docker `_ connector can spawn one or more instances of a Docker container locally on the StreamFlow node. The units of deployment and binding for this connector correspond to the set of homogeneous container instances, while the unit of scheduling is the single instance. -.. jsonschema:: ../../../streamflow/config/schemas/v1.0/docker.json +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/docker.json :lift_description: true \ No newline at end of file diff --git a/docs/source/connector/flux.rst b/docs/source/connector/flux.rst index b962113c8..1a0ce859b 100644 --- a/docs/source/connector/flux.rst +++ b/docs/source/connector/flux.rst @@ -2,9 +2,60 @@ FluxConnector ============= -The `Flux Framework `_ connector allows running jobs on a cluster with Flux Framework in a High Performance Computing Context. Although Flux can work in a local testing container or a cloud environment and has a Python SDK, to match the design here, we follow suit and use a :ref:`SSHConnection ` pointing to a login node. +The `Flux Framework `_ connector allows running jobs on a cluster with Flux Framework in a High Performance Computing Context. Although Flux can work in a local testing container or a cloud environment and has a Python SDK, to match the design here, we follow suit and inherit from the :ref:`QueueManagerConnector `. In this way, users can offload jobs to local or remote PBS controllers using the :ref:`stacked locations ` mechanism. The HPC facility is supposed to be constantly active, reducing the deployment phase to deploy the inner connector (e.g., to create an :ref:`SSHConnection ` pointing to an HPC login node). + +.. warning:: + + Note that in StreamFlow ``v0.1``, the ``QueueManagerConnector`` directly inherited from the :ref:`SSHConnector ` at the implementation level. Consequently, all the properties needed to open an SSH connection to the HPC login node (e.g., ``hostname``, ``username``, and ``sshKey``) were defined directly in the ``QueueManagerConnector``. This path is still supported by StreamFlow ``v0.2``, but it is deprecated and will be removed in StreamFlow ``v0.3``. + +Interaction with the Flux scheduler happens through a Bash script with ``#flux`` directives. Users can pass the path of a custom script to the connector using the ``file`` attribute of the :ref:`FluxService ` configuration. This file is interpreted as a `Jinja2 `_ template and populated at runtime by the connector. Alternatively, users can pass PBS options directly from YAML using the other options of a :ref:`FluxService ` object. + +As an example, suppose to have a Flux template script called ``batch.sh``, with the following content: + +.. code-block:: bash + + #!/bin/bash + + #flux --nodes=1 + #flux --queue=queue_name + + {{streamflow_command}} + +A PBS deployment configuration which uses the ``batch.sh`` file to spawn jobs can be written as follows: + +.. code-block:: yaml + + deployments: + flux-example: + type: pbs + config: + services: + example: + file: batch.sh + +Alternatively, the same behaviour can be recreated by directly passing options through the YAML configuration, as follows: + +.. code-block:: yaml + + deployments: + flux-example: + type: pbs + config: + services: + example: + nodes: 1 + queue: queue_name + +Being passed directly to the ``flux batch`` command line, the YAML options have higher priority than the file-based ones. + +.. warning:: + + Note that the ``file`` property in the upper configuration level, i.e., outside a ``service`` definition, is still supported in Streamflow ``v0.2``, but it is deprecated and will be removed in StreamFlow ``v0.3``. For a quick demo or tutorial, see our `example workflow `_. -.. jsonschema:: ../../../streamflow/config/schemas/v1.0/queue_manager.json +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/flux.json :lift_description: true + :lift_definitions: true + :auto_reference: true + :auto_target: true diff --git a/docs/source/connector/helm3.rst b/docs/source/connector/helm3.rst index 39502cac1..a6414793d 100644 --- a/docs/source/connector/helm3.rst +++ b/docs/source/connector/helm3.rst @@ -4,5 +4,5 @@ Helm3Connector The `Helm v3 `_ connector can spawn complex, multi-container environments on a `Kubernetes `_ cluster. The deployment unit is the entire Helm release, while the binding unit is the single container in a ``Pod``. StreamFlow requires each container in a Helm release to have a unique ``name`` attribute, allowing an unambiguous identification. Finally, the scheduling unit is the single instance of a potentially replicated container in a ``ReplicaSet``. -.. jsonschema:: ../../../streamflow/config/schemas/v1.0/helm3.json +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/helm3.json :lift_description: true diff --git a/docs/source/connector/kubernetes.rst b/docs/source/connector/kubernetes.rst new file mode 100644 index 000000000..0891a2e5a --- /dev/null +++ b/docs/source/connector/kubernetes.rst @@ -0,0 +1,8 @@ +=================== +KubernetesConnector +=================== + +The `Kubernetes `_ connector can spawn complex, multi-container environments on a Kubernetes cluster. The deployment unit is a set of Kubernetes YAML files, which are deployed in the order they are written in the ``config`` section and undeployed in the reverse order. The binding unit is the single container in a ``Pod``. StreamFlow requires each container in a Kubernetes namespace to have a unique ``name`` attribute, allowing an unambiguous identification. Finally, the scheduling unit is the single instance of a potentially replicated container in a ``ReplicaSet``. + +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/kubernetes.json + :lift_description: true \ No newline at end of file diff --git a/docs/source/connector/occam.rst b/docs/source/connector/occam.rst index 3c24c5249..d817d1667 100644 --- a/docs/source/connector/occam.rst +++ b/docs/source/connector/occam.rst @@ -8,5 +8,5 @@ It is different from standard HPC facilities for two main reasons. First, users This connector allows StreamFlow to offload computation to multi-container environments deployed on the Occam facility. The deployment unit is a multi-container environment deployed on one or more computing nodes. Multi-container environments are described in a YAML file with a syntax similar to the ``service`` section of `Docker Compose `_. Users can pass this file to the connector through the ``file`` parameter. The unit of binding is the single top-level entry in the file, while the scheduling unit is the single container instance. -.. jsonschema:: ../../../streamflow/config/schemas/v1.0/occam.json +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/occam.json :lift_description: true diff --git a/docs/source/connector/pbs.rst b/docs/source/connector/pbs.rst index 3063b0dfa..1b4fb42ee 100644 --- a/docs/source/connector/pbs.rst +++ b/docs/source/connector/pbs.rst @@ -2,11 +2,63 @@ PBSConnector ===================== -The `PBS `_ connector allows offloading execution to High-Performance Computing (HPC) facilities orchestrated by the PBS queue manager. The HPC facility is supposed to be constantly active, reducing the deployment phase to creating an :ref:`SSHConnection ` pointing to its login node. +The `PBS `_ connector allows offloading execution to High-Performance Computing (HPC) facilities orchestrated by the PBS queue manager. It extends the :ref:`QueueManagerConnector `, which inherits from the :ref:`ConnectorWrapper ` interface, allowing users to offload jobs to local or remote PBS controllers using the :ref:`stacked locations ` mechanism. The HPC facility is supposed to be constantly active, reducing the deployment phase to deploy the inner connector (e.g., to create an :ref:`SSHConnection ` pointing to an HPC login node). -Interaction with the PBS scheduler happens through a Bash script with ``#PBS`` directives. Users can pass the path of a custom script to the connector using the ``file`` attribute. This file is interpreted as a `Jinja2 `_ template and populated at runtime by the connector. +.. warning:: + + Note that in StreamFlow ``v0.1``, the ``QueueManagerConnector`` directly inherited from the :ref:`SSHConnector ` at the implementation level. Consequently, all the properties needed to open an SSH connection to the HPC login node (e.g., ``hostname``, ``username``, and ``sshKey``) were defined directly in the ``QueueManagerConnector``. This path is still supported by StreamFlow ``v0.2``, but it is deprecated and will be removed in StreamFlow ``v0.3``. + +Interaction with the PBS scheduler happens through a Bash script with ``#PBS`` directives. Users can pass the path of a custom script to the connector using the ``file`` attribute of the :ref:`PBSService ` configuration. This file is interpreted as a `Jinja2 `_ template and populated at runtime by the connector. Alternatively, users can pass PBS options directly from YAML using the other options of a :ref:`PBSService ` object. + +As an example, suppose to have a PBS template script called ``qsub.sh``, with the following content: + +.. code-block:: bash + + #!/bin/bash + + #PBS -l nodes=1 + #PBS -q queue_name + #PBS -l mem=1gb + + {{streamflow_command}} + +A PBS deployment configuration which uses the ``qsub.sh`` file to spawn jobs can be written as follows: + +.. code-block:: yaml + + deployments: + pbs-example: + type: pbs + config: + services: + example: + file: qsub.sh + +Alternatively, the same behaviour can be recreated by directly passing options through the YAML configuration, as follows: + +.. code-block:: yaml + + deployments: + pbs-example: + type: pbs + config: + services: + example: + destination: queue_name + resources: + mem: 1gb + nodes: 1 + +Being passed directly to the ``qsub`` command line, the YAML options have higher priority than the file-based ones. + +.. warning:: + + Note that the ``file`` property in the upper configuration level, i.e., outside a ``service`` definition, is still supported in Streamflow ``v0.2``, but it is deprecated and will be removed in StreamFlow ``v0.3``. The unit of binding is the entire HPC facility. In contrast, the scheduling unit is a single job placement in the PBS queue. Users can limit the maximum number of concurrently placed jobs by setting the ``maxConcurrentJobs`` parameter. -.. jsonschema:: ../../../streamflow/config/schemas/v1.0/queue_manager.json - :lift_description: true \ No newline at end of file +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/pbs.json + :lift_description: true + :lift_definitions: true + :auto_reference: true + :auto_target: true \ No newline at end of file diff --git a/docs/source/connector/queue-manager.rst b/docs/source/connector/queue-manager.rst new file mode 100644 index 000000000..ac0edcc70 --- /dev/null +++ b/docs/source/connector/queue-manager.rst @@ -0,0 +1,12 @@ +===================== +QueueManagerConnector +===================== + +The ``QueueManagerConnector`` is an abstract connector that serves as a base class to implement High Performance Computing connectors, based on queue managers (e.g., :ref:`Slurm `, :ref:`PBS `, and :ref:`Flux `). It extends the :ref:`ConnectorWrapper ` interface, allowing users to offload jobs to local or remote queue managers. The HPC facility is supposed to be constantly active, reducing the deployment phase to deploy the inner connector (e.g., to create an :ref:`SSHConnection ` pointing to an HPC login node). + +.. warning:: + + Note that in StreamFlow ``v0.1``, the ``QueueManagerConnector`` directly inherited from the :ref:`SSHConnector ` at the implementation level. Consequently, all the properties needed to open an SSH connection to the HPC login node (e.g., ``hostname``, ``username``, and ``sshKey``) were defined directly in the ``QueueManagerConnector``. This path is still supported by StreamFlow ``v0.2``, but it is deprecated and will be removed in StreamFlow ``v0.3``. + + +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/queue_manager.json \ No newline at end of file diff --git a/docs/source/connector/singularity.rst b/docs/source/connector/singularity.rst index df48a67f7..2cf03c909 100644 --- a/docs/source/connector/singularity.rst +++ b/docs/source/connector/singularity.rst @@ -4,5 +4,5 @@ SingularityConnector The `Singularity `_ connector can spawn one or more instances of a Singularity container locally on the StreamFlow node. The units of deployment and binding for this connector correspond to the set of homogeneous container instances, while the unit of scheduling is the single instance. -.. jsonschema:: ../../../streamflow/config/schemas/v1.0/singularity.json +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/singularity.json :lift_description: true \ No newline at end of file diff --git a/docs/source/connector/slurm.rst b/docs/source/connector/slurm.rst index d72d7a4d7..90b4f0421 100644 --- a/docs/source/connector/slurm.rst +++ b/docs/source/connector/slurm.rst @@ -2,11 +2,62 @@ SlurmConnector ===================== -The `Slurm `_ connector allows offloading execution to High-Performance Computing (HPC) facilities orchestrated by the Slurm queue manager. The HPC facility is supposed to be constantly active, reducing the deployment phase to creating an :ref:`SSHConnection ` pointing to its login node. +The `Slurm `_ connector allows offloading execution to High-Performance Computing (HPC) facilities orchestrated by the Slurm queue manager. It extends the :ref:`QueueManagerConnector `, which inherits from the :ref:`ConnectorWrapper ` interface, allowing users to offload jobs to local or remote Slurm controllers using the :ref:`stacked locations ` mechanism. The HPC facility is supposed to be constantly active, reducing the deployment phase to deploy the inner connector (e.g., to create an :ref:`SSHConnection ` pointing to an HPC login node). -Interaction with the Slurm scheduler happens through a Bash script with ``#SBATCH`` directives. Users can pass the path of a custom script to the connector using the ``file`` attribute. This file is interpreted as a `Jinja2 `_ template and populated at runtime by the connector. +.. warning:: + + Note that in StreamFlow ``v0.1``, the ``QueueManagerConnector`` directly inherited from the :ref:`SSHConnector ` at the implementation level. Consequently, all the properties needed to open an SSH connection to the HPC login node (e.g., ``hostname``, ``username``, and ``sshKey``) were defined directly in the ``QueueManagerConnector``. This path is still supported by StreamFlow ``v0.2``, but it is deprecated and will be removed in StreamFlow ``v0.3``. + +Interaction with the Slurm scheduler happens through a Bash script with ``#SLURM`` directives. Users can pass the path of a custom script to the connector using the ``file`` attribute of the :ref:`SlurmService ` configuration. This file is interpreted as a `Jinja2 `_ template and populated at runtime by the connector. Alternatively, users can pass Slurm options directly from YAML using the other options of a :ref:`SlurmService ` object. + +As an example, suppose to have a Slurm template script called ``sbatch.sh``, with the following content: + +.. code-block:: bash + + #!/bin/bash + + #SBATCH --nodes=1 + #SBATCH --partition=queue_name + #SBATCH --mem=1gb + + {{streamflow_command}} + +A Slurm deployment configuration which uses the ``sbatch.sh`` file to spawn jobs can be written as follows: + +.. code-block:: yaml + + deployments: + slurm-example: + type: slurm + config: + services: + example: + file: sbatch.sh + +Alternatively, the same behaviour can be recreated by directly passing options through the YAML configuration, as follows: + +.. code-block:: yaml + + deployments: + slurm-example: + type: slurm + config: + services: + example: + nodes: 1 + partition: queue_name + mem: 1gb + +Being passed directly to the ``sbatch`` command line, the YAML options have higher priority than the file-based ones. + +.. warning:: + + Note that the ``file`` property in the upper configuration level, i.e., outside a ``service`` definition, is still supported in Streamflow ``v0.2``, but it is deprecated and will be removed in StreamFlow ``v0.3``. The unit of binding is the entire HPC facility. In contrast, the scheduling unit is a single job placement in the Slurm queue. Users can limit the maximum number of concurrently placed jobs by setting the ``maxConcurrentJobs`` parameter. -.. jsonschema:: ../../../streamflow/config/schemas/v1.0/queue_manager.json +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/slurm.json :lift_description: true + :lift_definitions: true + :auto_reference: true + :auto_target: true diff --git a/docs/source/connector/ssh.rst b/docs/source/connector/ssh.rst index 7d6156a3f..87cc0250d 100644 --- a/docs/source/connector/ssh.rst +++ b/docs/source/connector/ssh.rst @@ -6,7 +6,7 @@ The `Secure SHell `_ (SSH) connector A single deployment can contain multiple nodes, which represent the deployment unit. Note that SSH nodes are already active, reducing the "deployment" phase to opening an :ref:`SSHConnection `. Nodes in the same deployment are not supposed to be directly connected. Consequently, data transfers always involve the StreamFlow management node, adopting a two-step copy strategy. The binding unit and the scheduling unit coincide with the single SSH host. -.. jsonschema:: ../../../streamflow/config/schemas/v1.0/ssh.json +.. jsonschema:: ../../../streamflow/deployment/connector/schemas/ssh.json :lift_description: true :lift_definitions: true :auto_reference: true diff --git a/docs/source/cwl/cwl-runner.rst b/docs/source/cwl/cwl-runner.rst index 1fce2077e..c971c9f6f 100644 --- a/docs/source/cwl/cwl-runner.rst +++ b/docs/source/cwl/cwl-runner.rst @@ -22,13 +22,13 @@ command is equivalent to a ``streamflow run`` command with the following ``strea In addition to the standard parameters, it is possible to pass a ``--streamflow-file`` argument to the ``cwl-runner`` CLI with the path of a StreamFlow file containing deployments and bindings (see the :ref:`StreamFlow file ` section). The ``workflows`` section of this file must have a single entry containing a list of ``bindings``. If present, the ``type`` and ``config`` entries will be ignored. Files containing multiple workflow entries will throw an exception. -For example, the workflow described in the :ref:`operations ` section of this guide can also be executed with the following command +For example, the workflow described :ref:`here ` can also be executed with the following command .. code-block:: bash cwl-runner --streamflow-file /path/to/streamflow.yml main.cwl config.cwl -where the `streamflow.yml` fail contains these lines +where the ``streamflow.yml`` fail contains these lines .. code-block:: yaml diff --git a/docs/source/cwl/docker-requirement.rst b/docs/source/cwl/docker-requirement.rst new file mode 100644 index 000000000..032aa56dc --- /dev/null +++ b/docs/source/cwl/docker-requirement.rst @@ -0,0 +1,66 @@ +====================== +CWL Docker Requirement +====================== + +The CWL standard supports a ``DockerRequirement`` feature to execute one or more workflow steps inside a `Docker container `_. A CWL runner must then ensure that all input files are available inside the container and choose a specific Docker runner to deploy the container. For example, the following script invokes a `Node.js `_ command inside a Docker image called `node:slim `_: + +.. code-block:: yaml + + cwlVersion: v1.2 + class: CommandLineTool + baseCommand: node + requirements: + DockerRequirement: + dockerPull: node:slim + inputs: + src: + type: File + inputBinding: + position: 1 + outputs: + example_out: + type: stdout + stdout: output.txt + +By default, StreamFlow autmoatically maps a step with the ``DockerRequirement`` option onto a :ref:`Docker ` deployment with the specified image. This mapping is pretty much equivalent to the following ``streamflow.yml`` file: + +.. code-block:: yaml + + version: v1.0 + workflows: + example: + type: cwl + config: + file: processfile + settings: jobfile + bindings: + - step: / + target: + deployment: docker-example + + deployments: + docker-example: + type: docker + config: + image: node:slim + +StreamFlow also supports the possibility to map a CWL ``DockerRequirement`` onto different types of connectors through the :ref:`CWLDockerTranslator ` extension point. In particular, the ``docker`` section of a workflow configuration can bind each step or subworkflow to a specific translator type, making it possible to convert a pure CWL workflow with ``DockerRequirement`` features into a hybrid workflow. + +As an example, the following ``streamflow.yml`` file runs the above ``CommandLineTool`` using a :ref:`SingularityConnector ` instead of a :ref:`DockerConnector ` to spawn the container: + +.. code-block:: yaml + + version: v1.0 + workflows: + example: + type: cwl + config: + file: processfile + settings: jobfile + docker: + step: / + deployment: + type: singularity + config: {} + +In detail, StreamFlow instantiates a :ref:`SingularityCWLDockerTranslator ` passing the content of the ``config`` field directly to the constructor. The translator is then in charge of generating a :ref:`SingularityConnector ` instance with the specified configuration for each CWL ``DockerRequirement`` configuration in the target subworkflow. \ No newline at end of file diff --git a/docs/source/cwl/docker/docker.rst b/docs/source/cwl/docker/docker.rst new file mode 100644 index 000000000..3d7b43917 --- /dev/null +++ b/docs/source/cwl/docker/docker.rst @@ -0,0 +1,8 @@ +========================= +DockerCWLDockerTranslator +========================= + +The Docker :ref:`CWLDockerTranslator ` instantiates a :ref:`DockerConnector ` instance with the given configuration for every CWL :ref:`DockerRequirement ` specification in the selected subworkflow. + +.. jsonschema:: ../../../../streamflow/cwl/requirement/docker/schemas/docker.json + :lift_description: true diff --git a/docs/source/cwl/docker/kubernetes.rst b/docs/source/cwl/docker/kubernetes.rst new file mode 100644 index 000000000..0812b75a4 --- /dev/null +++ b/docs/source/cwl/docker/kubernetes.rst @@ -0,0 +1,8 @@ +============================= +KubernetesCWLDockerTranslator +============================= + +The Kubernetes :ref:`CWLDockerTranslator ` instantiates a :ref:`KubernetesConnector ` instance with the given configuration for every CWL :ref:`DockerRequirement ` specification in the selected subworkflow. + +.. jsonschema:: ../../../../streamflow/cwl/requirement/docker/schemas/kubernetes.json + :lift_description: true diff --git a/docs/source/cwl/docker/singularity.rst b/docs/source/cwl/docker/singularity.rst new file mode 100644 index 000000000..f0f129d01 --- /dev/null +++ b/docs/source/cwl/docker/singularity.rst @@ -0,0 +1,8 @@ +============================== +SingularityCWLDockerTranslator +============================== + +The Singularity :ref:`CWLDockerTranslator ` instantiates a :ref:`SingularityConnector ` instance with the given configuration for every CWL :ref:`DockerRequirement ` specification in the selected subworkflow. + +.. jsonschema:: ../../../../streamflow/cwl/requirement/docker/schemas/docker.json + :lift_description: true diff --git a/docs/source/ext/binding-filter.rst b/docs/source/ext/binding-filter.rst new file mode 100644 index 000000000..31ac1b69b --- /dev/null +++ b/docs/source/ext/binding-filter.rst @@ -0,0 +1,25 @@ +============= +BindingFilter +============= + +StreamFlow lets users map steps to :ref:`multiple targets `. A ``BindingFilter`` object implements a strategy to manipulate and reorder the list of targets bound to a given step before the StreamFlow :ref:`Scheduler ` component evaluates them. The ``BindingFilter`` interface specified in the ``streamflow.core.deployment`` module contains a single ``get_targets`` method: + +.. code-block:: python + + async def get_targets( + self, job: Job, targets: MutableSequence[Target] + ) -> MutableSequence[Target]: + ... + +The ``get_targets`` method receives a ``Job`` object and a list of ``Target`` objects, the list of targets specified by the user, and returns another list of ``Target`` objects. The :ref:`Scheduler ` component will evaluate the returned list of targets to find an allocation for the ``Job`` object. + +By default, if no ``BindingFilter`` is specified for a multi-target step binding, all the ``Target`` objects will be evaluated in the original order. In addition, StreamFlow defines a ``ShuffleBindingFilter`` implementation to randomise the evaluation order at any invocation. + +Implementations +=============== + +======= ========================================================= +Type Class +======= ========================================================= +shuffle streamflow.deployment.filter.shuffle.ShuffleBindingFilter +======= ========================================================= diff --git a/docs/source/ext/connector.rst b/docs/source/ext/connector.rst new file mode 100644 index 000000000..b3f2485d1 --- /dev/null +++ b/docs/source/ext/connector.rst @@ -0,0 +1,123 @@ +========= +Connector +========= + +StreamFlow demands the lifecycle management of each execution environment to a specific implementation of the ``Connector`` interface. In particular, a single ``Connector`` object is created for each ``deployment`` object described in the :ref:`StreamFlow file `. + +The ``streamflow.core.deployment`` module defines the ``Connector`` interface, which exposes the following public methods: + +.. code-block:: python + + async def copy_local_to_remote( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + read_only: bool = False, + ) -> None: + ... + + async def copy_remote_to_local( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + read_only: bool = False, + ) -> None: + ... + + async def copy_remote_to_remote( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + source_location: Location, + source_connector: Connector | None = None, + read_only: bool = False, + ) -> None: + ... + + async def deploy( + self, external: bool + ) -> None: + ... + + async def get_available_locations( + self, + service: str | None = None, + input_directory: str | None = None, + output_directory: str | None = None, + tmp_directory: str | None = None, + ) -> MutableMapping[str, AvailableLocation]: + ... + + async def run( + self, + location: Location, + command: MutableSequence[str], + environment: MutableMapping[str, str] = None, + workdir: str | None = None, + stdin: int | str | None = None, + stdout: int | str = asyncio.subprocess.STDOUT, + stderr: int | str = asyncio.subprocess.STDOUT, + capture_output: bool = False, + timeout: int | None = None, + job_name: str | None = None, + ) -> tuple[Any | None, int] | None: + ... + + async def undeploy( + self, external: bool + ) -> None: + ... + +The ``deploy`` method instantiates the remote execution environment, making it ready to receive requests for data transfers and command executions. A ``deployment`` object can be marked as ``external`` in the StreamFlow file. In that case, the ``Connector`` should assume that the execution environment is already up and running, and the ``deploy`` method should only open the necessary connections to communicate with it. + +The ``undeploy`` method destroys the remote execution environment, potentially cleaning up all the temporary resources instantiated during the workflow execution (e.g., intermediate results). If a ``deployment`` object is marked as ``external``, the ``undeploy`` method should not destroy it but just close all the connections opened by the ``deploy`` method. + +The ``get_available_locations`` method is used in the scheduling phase to obtain the locations available for job execution, identified by their unique name (see :ref:`here `). The method receives some optional input parameters to filter valid locations. The ``service`` parameter specifies a specific set of locations in a deployment, and its precise meaning differs for each deployment type (see :ref:`here `). The other three parameters (``input_directory``, ``output_directory``, and ``tmp_directory``) allow the ``Connector`` to return correct disk usage values for each of the three folders in case of remote instances with multiple volumes attached. + +The ``copy`` methods perform a data transfer from a ``src`` path to a ``dst`` path in one or more destination ``locations`` in the execution environment controlled by the ``Connector``. The ``read_only`` parameter notifies the ``Connector`` if the destination files will be modified in place or not. This parameter prevents unattended side effects (e.g., symlink optimizations on the remote locations). The ``copy_remote_to_remote`` method accepts two additional parameters: a ``source_location`` and an optional ``source_connector``. The latter identifies the ``Connector`` instance that controls the ``source_location`` and defaults to ``self`` when not specified. + +The ``run`` method performs a remote ``command`` execution on a remote ``location``. The ``command`` parameter is a list of arguments, mimicking the Python `subprocess `_ abstraction. Many optional parameters can be passed to the ``run`` method. The ``environment`` parameter is a dictionary of environment variables, which should be defined in the remote execution context before executing the command. The ``workdir`` parameter identifies the remote working directory. The ``stdin``, ``stdout``, and ``stderr`` parameters are used for remote stream redirection. The ``capture_output`` parameter specifies if the command output should be retrieved or not. If ``capture_output`` is set to ``True``, the ``run`` method returns the command output and return code, while it does not return anything if ``capture_output`` is set to ``False``. The ``timeout`` parameter specifies a maximum completion time for the remote execution, after which the ``run`` method throws a ``WorkflowExecutionException``. Finally, the ``job_name`` parameter is the unique identifier of a StreamFlow job, which is used for debugging purposes. + +BaseConnector +============= + +Users who want to implement their own ``Connector`` class should extend from the ``BaseConnector`` whenever possible. The StreamFlow ``BaseConnector`` implementation, defined in the ``streamflow.deployment.connector.base`` module, already provides some essential support for logging and tar-based streaming data transfers. Plus, it correctly handles :ref:`FutureConnector ` instances by extending the ``FutureAware`` base class. However, the ``BaseConnector`` does not allow wrapping inner connectors using the ``wraps`` directive (see :ref:`here `). Indeed, only connectors extending the :ref:`ConnectorWrapper ` interface support the ``wraps`` directive. + +LocalConnector +============== + +The ``LocalConnector`` class is a special subtype of the ``Connector`` instance that identifies the StreamFlow local node. As discussed above, data transfers that involve the local node are treated differently from remote-to-remote data movements. In general, several StreamFlow classes adopt different strategies when an action involves the local node or a remote one, and these decisions involve verifying if a ``Connector`` object extends the ``LocalConnector`` class. For this reason, users who want to provide their version of a local ``Connector`` must extend the ``LocalConnector`` class and not the ``BaseConnector`` as in other cases. + +FutureConnector +=============== + +In the ``eager`` setting, all the ``Connector`` objects deploy their related execution environment at the beginning of a workflow execution. However, to save resources, it is sometimes desirable to adopt a ``lazy`` approach, deploying each execution environment only when it receives the first request from the StreamFlow control plane. Users can switch between these behaviours by setting the ``lazy`` attribute of each ``target`` object to ``True`` (the default) or ``False`` in the StreamFlow file. + +A ``FutureConnector`` instance wraps an actual ``Connector`` instance and implements the ``lazy`` behaviour: the ``deploy`` method does nothing, and each other method calls the ``deploy`` method on the inner ``Connector`` to initialize it and delegate the action. The main drawback of this implementation is that the type checking on a ``FutureConnector`` instance will return the wrong connector type. A ``FutureAware`` class solves this issue by transparently returning the type of the inner ``Connector``. All custom ``Connector`` instances defined by the users should extend the ``FutureAware`` class directly or indirectly by extending the :ref:`BaseConnector ` or :ref:`ConnectorWrapper ` classes. + +ConnectorWrapper +================ + +StreamFlow supports :ref:`stacked locations ` using the ``wraps`` directive. However, not all ``Connector`` instances support inner connectors, but only those that extend the ``ConenctorWrapper`` interface. By default, a ``ConnectorWrapper`` instance receives an internal ``Connector`` object as a constructor parameter and delegates all the method calls to the wrapped ``Connector``. Plus, it already extends the ``FutureAware`` class, correctly handling :ref:`FutureConnector ` instances. Users who want to create a custom ``Connector`` instance with support for the ``wraps`` directive must extend the ``ConnectorWrapper`` class and not the ``BaseConnector`` as in other cases. + +Implementations +=============== + +======================================================= ================================================================ +Name Class +======================================================= ================================================================ +:ref:`docker ` streamflow.deployment.connector.docker.DockerConnector +:ref:`docker-compose ` streamflow.deployment.connector.docker.DockerComposeConnector +:ref:`flux ` streamflow.deployment.connector.queue_manager.FluxConnector +:ref:`helm ` streamflow.deployment.connector.kubernetes.Helm3Connector +:ref:`helm3 ` streamflow.deployment.connector.kubernetes.Helm3Connector +:ref:`kubernetes ` streamflow.deployment.connector.kubernetes.KubernetesConnector +:ref:`occam ` streamflow.deployment.connector.occam.OccamConnector +:ref:`pbs ` streamflow.deployment.connector.queue_manager.PBSConnector +:ref:`singularity ` streamflow.deployment.connector.singularity.SingularityConnector +:ref:`slurm ` streamflow.deployment.connector.queue_manager.SlurmConnector +:ref:`ssh ` streamflow.deployment.connector.ssh.SSHConnector +======================================================= ================================================================ \ No newline at end of file diff --git a/docs/source/ext/cwl-docker-translator.rst b/docs/source/ext/cwl-docker-translator.rst new file mode 100644 index 000000000..708e23775 --- /dev/null +++ b/docs/source/ext/cwl-docker-translator.rst @@ -0,0 +1,33 @@ +=================== +CWLDockerTranslator +=================== + +StreamFlow relies on a ``CWLDockerTranslator`` object to convert a CWL `DockerRequirement `_ specification into a step binding on a given :ref:`Connector ` instance. By default, the :ref:`DockerCWLDockerTranslator ` is used to spawn a :ref:`DockerConnector `. However, StreamFlow also supports translators for :ref:`Kubernetes ` and :ref:`Singularity `, and more can be implemented by the community using the :ref:`plugins ` mechanism (see :ref:`here `). + +The ``CWLDockerTranslator`` interface is defined in the ``streamflow.cwl.requirement.docker.translator`` module and exposes a single public method ``get_target``: + +.. code-block:: python + + def get_target( + self, + image: str, + output_directory: str | None, + network_access: bool, + target: Target, + ) -> Target: + ... + +The ``get_target`` method returns a ``Target`` object that contains an auto-generated ``DeploymentConfig`` that reflects the ``CWLDockerTranslator`` configuration. The ``target`` parameter contains the original ``Target`` object of the related step. If the ``Conenctor`` created by the ``CWLDockerTranslator`` extends the :ref:`ConnectorWrapper ` class and the ``wrapper`` directive is defined as ``True`` in the StreamFlow file, the newly created ``Target`` object wraps the original one. + +The other parameters derive from the CWL workflow specification. In particular, the ``image`` parameter points to the Docker image needed by the step. The ``output_directory`` parameter reflects the ``dockerOutputDirectory`` option of a CWL ``DockerRequirement``. The ``network_access`` parameter derives from the CWL `NetworkAccess `_ requirement. + +Implementations +=============== + +=================================================== ================================================================ +Type Class +=================================================== ================================================================ +:ref:`docker ` streamflow.cwl.requirement.docker.DockerCWLDockerTranslator +:ref:`kubernetes ` streamflow.cwl.requirement.docker.KubernetesCWLDockerTranslator +:ref:`singularity ` streamflow.cwl.requirement.docker.SingularityCWLDockerTranslator +=================================================== ================================================================ diff --git a/docs/source/ext/data-manager.rst b/docs/source/ext/data-manager.rst new file mode 100644 index 000000000..aea585c9c --- /dev/null +++ b/docs/source/ext/data-manager.rst @@ -0,0 +1,84 @@ +=========== +DataManager +=========== + +The ``DataManager`` interface performs data transfers to and from remote execution locations and, for each file, keeps track of its replicas across the distributed environment. It is defined in the ``streamflow.core.data`` module and exposes several public methods: + +.. code-block:: python + + async def close(self) -> None: + ... + + def get_data_locations( + self, + path: str, + deployment: str | None = None, + location: str | None = None, + location_type: DataType | None = None, + ) -> MutableSequence[DataLocation]: + ... + + def get_source_location( + self, path: str, dst_deployment: str + ) -> DataLocation | None: + ... + + def invalidate_location( + self, location: Location, path: str + ) -> None: + ... + + def register_path( + self, + location: Location, + path: str, + relpath: str, + data_type: DataType = DataType.PRIMARY, + ) -> DataLocation: + ... + + def register_relation( + self, src_location: DataLocation, dst_location: DataLocation + ) -> None: + ... + + async def transfer_data( + self, + src_location: Location, + src_path: str, + dst_locations: MutableSequence[Location], + dst_path: str, + writable: bool = False, + ) -> None: + ... + +The ``transfer_data`` method performs a data transfer from one source location to a set of target locations, called ``src_location`` and ``dst_locations``. The ``src_path`` parameter identifies the position of the data in the source file system, while ``dst_path`` specifies where the data must be transferred in the destination file systems. Note that the destination path is always the same in all destination locations. The ``writable`` parameter states that the data will be modified in place in the destination location. This parameter prevents unattended side effects (e.g., symlink optimizations when source and destination locations are equal). + +The ``register_path`` method informs the ``DataManager`` about relevant data in a ``location`` file system at a specific ``path``. Sometimes, a file or directory is identified by a relative path, which filters out implementation-specific file system structures (e.g., the job-specific input directory). The ``relpath`` parameter contains the relevant portion of a path. The ``data_type`` parameter specifies the nature of the registered path. The available ``DataType`` identifiers are: ``PRIMARY`` for actual data; ``SYMBOLIC_LINK`` for links pointing to primary locations; ``INVALID``, which marks a ``DataLocation`` object as unavailable for future usage. + +The ``register_relation`` method informs the ``DataManager`` that two distinct locations ``src_location`` and ``dst_location`` point to the same data. In other words, if the related data are needed, they can be collected interchangeably from one of the two locations. + +The ``invalidate_location`` method informs the ``DataManager`` that the data registered in a ``location`` file system at a specific ``path`` are not available anymore, e.g., due to file system corruption or a failed data transfer. In practice, the ``DataType`` of the identified location is marked as ``INVALID``. + +The ``get_data_locations`` method retrieves all the valid ``DataLocation`` objects related to the ``path`` received in input. Plus, the set of locations can be further filtered by the ``deployment`` to which the location belongs, the name of the location on which the data object resides (``location_name``), or a given ``data_type``. Note that all the ``DataLocation`` objects that are marked ``INVALID`` should not be returned by this method. + +The ``get_source_location`` method receives in input a ``path`` and the name of the destination deployment ``dst_deployment``, and it returns the ``DataLocation`` object that is most suitable to act as source location for performing the data transfer. The logic used to identify the best location is implementation-dependent. If no suitable location can be found, the method returns ``None``. + +The ``close`` method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object’s lifetime, e.g., network or database connections. + +Implementations +=============== + +======= ========================================== +Type Class +======= ========================================== +default streamflow.data.manager.DefaultDataManager +======= ========================================== + +In the ``DefaultDataManager`` implementation, the distributed virtual file system is stored in memory in a dedicated data structure called ``RemotePathMapper``. The ``get_source_location`` method adopts the following strategy to choose the most suitable ``DataLocation`` object: + +1. All the valid ``DataLocation`` objects related to the given ``path`` are retrieved by calling the ``get_data_locations`` method; +2. If there exists a ``DataLocation`` object marked as ``PRIMARY`` that resides on one of the locations belonging to the ``dst_deployment``, choose it; +3. Otherwise, if there exists a ``DataLocation`` object marked as ``PRIMARY`` that resides locally on the StreamFlow node, choose it; +4. Otherwise, if any of the retrieved ``DataLocation`` objects are marked as ``PRIMARY``, randomly choose one of them; +5. Otherwise, return ``None``. \ No newline at end of file diff --git a/docs/source/ext/database.rst b/docs/source/ext/database.rst new file mode 100644 index 000000000..5cd97ec97 --- /dev/null +++ b/docs/source/ext/database.rst @@ -0,0 +1,249 @@ +======== +Database +======== + +StreamFlow relies on a persistent ``Database`` to store all the metadata regarding a workflow execution. These metadata are used for fault tolerance, provenance collection, and reporting. All StreamFlow entities interacting with the ``Database`` extend the ``PersistableEntity`` interface, adhering to the `Object-Relational Mapping (ORM) `_ programming model: + +.. code-block:: python + + def __init__(self): + self.persistent_id: int | None = None + + @classmethod + async def load( + cls, + context: StreamFlowContext, + persistent_id: int, + loading_context: DatabaseLoadingContext, + ) -> PersistableEntity: + ... + + async def save( + self, context: StreamFlowContext + ) -> None: + ... + +Each ``PersistableEntity`` is identified by a unique numerical ``persistent_id`` related to the corresponding ``Database`` record. Two methods, ``save`` and ``load``, allow persisting the entity in the ``Database`` and retrieving it from the persistent record. Note that ``load`` is a class method, as it must construct a new instance. + +The ``load`` method receives three input parameters: the current execution ``context``, the ``persistent_id`` of the instance that should be loaded, and a ``loading_context``. The latter keeps track of all the objects already loaded in the current transaction, serving as a cache to efficiently load nested entities and prevent deadlocks when dealing with circular references. + +The ``Database`` interface, defined in the ``streamflow.core.persistence`` module, contains all the methods to create, modify, and retrieve this metadata. Data deletion is unnecessary, as StreamFlow never removes existing records. Internally, the ``save`` and ``load`` methods call one or more of these methods to perform the desired operations. + +.. code-block:: python + + async def add_command( + self, step_id: int, tag: str, cmd: str + ) -> int: + ... + + async def add_dependency( + self, step: int, port: int, type: DependencyType, name: str + ) -> None: + ... + + async def add_deployment( + self, + name: str, + type: str, + config: str, + external: bool, + lazy: bool, + workdir: str | None, + ) -> int: + ... + + async def add_port( + self, + name: str, + workflow_id: int, + type: type[Port], + params: MutableMapping[str, Any], + ) -> int: + ... + + async def add_provenance( + self, inputs: MutableSequence[int], token: int + ) -> None: + ... + + async def add_step( + self, + name: str, + workflow_id: int, + status: int, + type: type[Step], + params: MutableMapping[str, Any], + ) -> int: + ... + + async def add_target( + self, + deployment: int, + type: type[Target], + params: MutableMapping[str, Any], + locations: int = 1, + service: str | None = None, + workdir: str | None = None, + ) -> int: + ... + + async def add_token( + self, tag: str, type: type[Token], value: Any, port: int | None = None + ) -> int: + ... + + async def add_workflow( + self, name: str, params: MutableMapping[str, Any], status: int, type: str + ) -> int: + ... + + async def close(self) -> None: + ... + + async def get_dependees( + self, token_id: int + ) -> MutableSequence[MutableMapping[str, Any]]: + ... + + async def get_dependers( + self, token_id: int + ) -> MutableSequence[MutableMapping[str, Any]]: + ... + + async def get_command( + self, command_id: int + ) -> MutableMapping[str, Any]: + ... + + async def get_commands_by_step( + self, step_id: int + ) -> MutableSequence[MutableMapping[str, Any]]: + ... + + async def get_deployment( + self, deplyoment_id: int + ) -> MutableMapping[str, Any]: + ... + + async def get_input_ports( + self, step_id: int + ) -> MutableSequence[MutableMapping[str, Any]]: + ... + + async def get_output_ports( + self, step_id: int + ) -> MutableSequence[MutableMapping[str, Any]]: + ... + + async def get_port( + self, port_id: int + ) -> MutableMapping[str, Any]: + ... + + async def get_port_tokens( + self, port_id: int + ) -> MutableSequence[int]: + ... + + async def get_reports( + self, workflow: str, last_only: bool = False + ) -> MutableSequence[MutableSequence[MutableMapping[str, Any]]]: + ... + + async def get_step( + self, step_id: int + ) -> MutableMapping[str, Any]: + ... + + async def get_target( + self, target_id: int + ) -> MutableMapping[str, Any]: + ... + + async def get_token( + self, token_id: int + ) -> MutableMapping[str, Any]: + ... + + async def get_workflow( + self, workflow_id: int + ) -> MutableMapping[str, Any]: + ... + + async def get_workflow_ports( + self, workflow_id: int + ) -> MutableSequence[MutableMapping[str, Any]]: + ... + + async def get_workflow_steps( + self, workflow_id: int + ) -> MutableSequence[MutableMapping[str, Any]]: + ... + + async def get_workflows_by_name( + self, workflow_name: str, last_only: bool = False + ) -> MutableSequence[MutableMapping[str, Any]]: + ... + + async def get_workflows_list( + self, name: str | None + ) -> MutableSequence[MutableMapping[str, Any]]: + ... + + async def update_command( + self, command_id: int, updates: MutableMapping[str, Any] + ) -> int: + ... + + async def update_deployment( + self, deployment_id: int, updates: MutableMapping[str, Any] + ) -> int: + ... + + async def update_port( + self, port_id: int, updates: MutableMapping[str, Any] + ) -> int: + ... + + async def update_step( + self, step_id: int, updates: MutableMapping[str, Any] + ) -> int: + ... + + async def update_target( + self, target_id: str, updates: MutableMapping[str, Any] + ) -> int: + ... + + async def update_workflow( + self, workflow_id: int, updates: MutableMapping[str, Any] + ) -> int: + ... + +There are three families of methods in the ``Database`` interface: ``add_entity``, ``update_entity``, and ``get_data``. All these methods are generic to avoid changing the interface whenever the internals of an entity slightly change. + +Each ``add_entity`` method receives in input the parameter values for each entity attribute and returns the numeric ``persistent_id`` of the created entity. Some methods also accept a ``type`` field, which identifies a particular class of entities, and a ``params`` field, a dictionary of additional entity parameters. Combined, these two features allow reusing the same method (and, optionally, the same database record structure) to store a whole hierarchy of entities inheriting from a base class. + +Each ``update_entity`` method receives in input the ``persistent_id`` of the entity that should be modified and a dictionary, called ``updates``, with the names of fields to be updated as keys and their new contents as values. All of them return the numeric ``persistent_id`` of the updated entity. + +Each ``get_data`` method receives in input the identifier (commonly the ``persistent_id``) of an entity and returns all the data related to that entity. Some methods also accept a boolean ``last_only`` parameter, which states if all the entities should be returned or just the most recent. All ``get_data`` methods return generic data structures, i.e., lists or dictionaries. The shape of each dictionary varies from one method to another and is documented in the source code. + +The ``close`` method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object’s lifetime, e.g., network or database connections. + +Implementations +=============== + +====== ============================================ +Type Class +====== ============================================ +sqlite streamflow.persistence.sqlite.SqliteDatabase +====== ============================================ + +By default, StreamFlow uses a local ``SqliteDatabase`` instance for metadata persistence. The ``connection`` directive can be set to ``:memory:`` to avoid disk I/O and improve performance. However, in this case, all the metadata will be erased when the workflow execution terminates. + +.. jsonschema:: ../../../streamflow/persistence/schemas/sqlite.json + +The database schema is structured as follows: + +.. literalinclude:: ../../../streamflow/persistence/schemas/sqlite.sql + :language: sql diff --git a/docs/source/ext/deployment-manager.rst b/docs/source/ext/deployment-manager.rst new file mode 100644 index 000000000..cc0be5651 --- /dev/null +++ b/docs/source/ext/deployment-manager.rst @@ -0,0 +1,47 @@ +================= +DeploymentManager +================= + +The ``DeploymentManager`` interface instantiates and manages :ref:`Connector ` objects for each ``deployment`` object described in a :ref:`StreamFlow file `. It is defined in the ``streamflow.core.deployment`` module and exposes several public methods: + +.. code-block:: python + + async def close(self) -> None: + ... + + async def deploy( + self, deployment_config: DeploymentConfig + ) -> None: + ... + + def get_connector( + self, deployment_name: str + ) -> Connector | None: + ... + + async def undeploy( + self, deployment_name: str + ) -> None: + ... + + async def undeploy_all(self) -> None: + ... + +The ``deploy`` method instantiates a ``Connector`` object starting from the given ``DeploymentConfig`` object, which derives from the ``deployments`` section in the StreamFlow file. Then, it deploys the related execution environment by calling the ``deploy`` method of the ``Connector`` object. Note that if a deployment ``wraps`` another environment (see :ref:`here `), the wrapped environment must be deployed before the wrapper one. It is in charge of each ``DeploymentManager`` implementation to correctly manage these dependencies, potentially throwing a ``WorkflowDefinitionException`` in case of misspecifications (e.g., circular dependencies). Also, it is in charge of the ``DeploymentManager`` to correctly handle concurrent calls to the ``deploy`` method with the same target deployment, e.g., to avoid spurious multiple deployments of identical infrastructures. + +The ``get_connector`` method returns the ``Connector`` object related to the ``deployment_name`` input parameter or ``None`` if the environment has not been deployed yet. Note that calling ``get_connector`` before calling ``deploy`` or after calling ``undeploy`` on the related environment should always return ``None``. + +The ``undeploy`` method undeploys the target execution infrastructure, identified by the ``deployment_name`` input parameter, by calling the ``undeploy`` method of the related ``Connector`` object. Plus, it marks the ``Connector`` object as invalid. It is in charge of the ``DeploymentManager`` to correctly handle concurrent calls to the ``undeploy`` method with the same target deployment. + +The ``undeploy_all`` method undeploys all the active execution environments. It is equivalent to calling the ``undeploy`` method on each active deployment. StreamFlow always calls this method before terminating to clean up the execution interface. + +The ``close`` method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object’s lifetime, e.g., network or database connections. + +Implementations +=============== + +======= ====================================================== +Type Class +======= ====================================================== +default streamflow.deployment.manager.DefaultDeploymentManager +======= ====================================================== diff --git a/docs/source/ext/fault-tolerance.rst b/docs/source/ext/fault-tolerance.rst new file mode 100644 index 000000000..b8aa37060 --- /dev/null +++ b/docs/source/ext/fault-tolerance.rst @@ -0,0 +1,13 @@ +=============== +Fault tolerance +=============== + +CheckpointManager +================= + +WIP + +FailureManager +============== + +WIP \ No newline at end of file diff --git a/docs/source/ext/plugins.rst b/docs/source/ext/plugins.rst new file mode 100644 index 000000000..d9d57345e --- /dev/null +++ b/docs/source/ext/plugins.rst @@ -0,0 +1,192 @@ +======= +Plugins +======= + +The StreamFlow control plane can be extended by means of self-contained plugins. This mechanism allows advanced users to implement application-specific behaviour and researchers to experiment new workflow orchestration ideas in a battle-tested ecosystem. Plus, it improves maintainability and reduces the memory footprint, as the StreamFlow codebase can implement just the core features. + +A StreamFlow plugin is a Python package with two characteristics: + +* A Python class extending the ``StreamFlowPlugin`` base class; +* An entry in the ``entry_points`` section of the ``setup.py`` configuration file, with key ``unito.streamflow.plugin`` and value ``=:``, where ``plugin_name`` can be chosen at discretion of the user and ``:`` points to the class implementing the plugin. + +The installed plugins can be explored using the StreamFlow Command Line Interface (CLI). To see the list of installed plugins, use the following subcommand: + +.. code-block:: bash + + streamflow plugin list + +To print additional details about a specific plugin called ````, including all the provided extensions, use the following subcommand: + +.. code-block:: bash + + streamflow plugin show + + +Extension points +================ + +StreamFlow supports several extension points, whose interfaces are described in a dedicated section of this documentation. For each type of extension point, the ``StreamFlowPlugin`` class exposes a method to register the custom implementation. All methods have the following signature: + +.. code-block:: python + + def register_extension_point(name: str, cls: type[ExtensionPointType]) -> None: + +The ``cls`` attribute points to the class that implements the extension point, while the ``name`` attribute defines the keyword that users should put in the ``type`` field of the ``streamflow.yml`` file to instantiate this implementation. + +.. note:: + + In order to avoid name conflicts, it is a good practice to prefix the ``name`` variable with an organization-specific prefix. Unprefixed names are reserved for StreamFlow core components. However, since plugins are always loaded after core components, creating a plugin with the same name of a core implementation will override it. The same will happen when two third-party plugins have the same name, but the loading order is undefined and may vary from one Python environment to another, hindering reproducibility. + +The ``register`` method of the ``StreamFlowPlugin`` implementation must contain all the required calls to the ``register_extension_point`` methods in order to make the extension points available to the StreamFlow control plane. Note that a single ``StreamFlowPlugin`` can register multiple extension points, and even multiple implementations for the same extension point. + +Each extension point class extends the ``SchemaEntity`` abstract class, whose method ``get_schema`` returns the path to the class configuration in `JSON Schema 2019-09 `_ format. The root entity of this file must be an ``object`` defining a set of properties, which will be exposed to the user in the ``config`` section of the ``streamflow.yml`` file, type-checked by StreamFlow at the beginning of each workflow run, and passed to the class constructor at every instantiation. + +The base class for each extension point defined by StreamFlow and the respective registration method exposed by the ``StreamFlowPlugin`` class are reported in the table below: + +============================================================================================= =================================== +Base Class Registration Method +============================================================================================= =================================== +:ref:`streamflow.core.deployment.BindingFilter ` ``register_binding_filter`` +:ref:`streamflow.core.recovery.CheckpointManager ` ``register_checkpoint_manager`` +:ref:`streamflow.cwl.requirement.docker.translator.CWLDockerTranslator ` ``register_cwl_docker_translator`` +:ref:`streamflow.core.deployment.Connector ` ``register_connector`` +:ref:`streamflow.core.data.DataManager ` ``register_data_manager`` +:ref:`streamflow.core.persistence.Database ` ``register_database`` +:ref:`streamflow.core.deployment.DeploymentManager ` ``register_deployment_manager`` +:ref:`streamflow.core.recovery.FailureManager ` ``register_failure_manager`` +:ref:`streamflow.core.scheduling.Policy ` ``register_policy`` +:ref:`streamflow.core.scheduling.Scheduler ` ``register_scheduler`` +============================================================================================= =================================== + +In addition, a ``register_schema`` method allows to register additional JSON Schema files, which are not directly referenced by any ``SchemaEntity`` class through the ``get_schema`` method. This feature is useful to, for example, define some base abstract JSON Schema that concrete entities can extend. + +Note that there is no official way to make JSON Schema files inherit properties from each other, as vanilla JSON Schema format does not support inheritance. However, it is possible to extend base schemas using the combination of `allOf `_ and `unevaluatedProperties `_ directives of JSON Schema 2019-09, as follows: + +.. code-block:: json + + { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "my-schema-id.json", + "type": "object", + "allOf": [ + { + "$ref": "my-base-schema-id.json" + } + ], + "properties": {}, + "unevaluatedProperties": false + } + +.. note:: + + Since JSON Schema extension is based on the JSON Reference mechanism ``$ref``, which collects schemas through their ``$id`` field, it is a good practice to include an organization-specific fqdn in the ``$id`` field of each JSON Schema to avoid clashes. + +StreamFlow extensions can also be explored through the Command Line Interface (CLI). To print the set of installed extension instances divided by the targeted extension points, use the following subcommand: + +.. code-block:: bash + + streamflow ext list + +To print detailed documentation, including the associated JSON Schema, of an extension instance called ```` related to the extension point ````, use the following subcommand: + +.. code-block:: bash + + streamflow ext show + + +Example: a PostgreSQL Plugin +============================ + +As an example, suppose that a class ``PostgreSQLDatabase`` implements a `PostgreSQL `_-based implementation of the StreamFlow database. Then, a ``PostgreSQLStreamFlowPlugin`` class will have the following implementation: + +.. code-block:: python + + from streamflow.core.persistence import Database + from streamflow.ext import StreamFlowPlugin + + class PostgreSQLDatabase(Database): + @classmethod + def get_schema(cls) -> str: + return pkg_resources.resource_filename( + __name__, os.path.join("schemas", "postgresql.json") + ) + ... + + class PostgreSQLStreamFlowPlugin(StreamFlowPlugin): + def register(self) -> None: + self.register_database("unito.postgresql", PostgresqlDatabase) + +Each extension point class must implement a ``get_schema`` method, pointing to a `JSON Schema `_ file, which contains all the configurable parameters that can be specified by the user in the ``streamflow.yml`` file. Such parameters will be propagated to the class constructor at each invocation. For example, the ``PostgreSQLDatabase`` class specified above points to a ``schemas/postgresql.json`` schema file in the same Python module. + +A schema file should follow the `2019-09 `_ version of JSON Schema. StreamFlow uses schema files to validate the ``streamflow.yml`` file at runtime before executing a workflow instance. Plus, it relised on schema ``properties`` to print documentation when a user invokes the ``streamflow ext show`` CLI subcommand. An example of schema file for the ``PostreSQLDatabase`` class is the following: + +.. code-block:: json + + { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://streamflow.di.unito.it/plugins/schemas/persistence/postgresql.json", + "type": "object", + "properties": { + "dbname": { + "type": "string", + "description": "The name of the database to use" + }, + "hostname": { + "type": "string", + "description": "The database hostname or IP address" + }, + "maxConnections": { + "type": "integer", + "description": "Maximum size of the PostgreSQL connection pool. 0 means unlimited pool size", + "default": 10 + }, + "password": { + "type": "string", + "description": "Password to use when connecting to the database" + }, + "timeout": { + "type": "integer", + "description": "The timeout (in seconds) for connection operations", + "default": 20 + }, + "username": { + "type": "string", + "description": "Username to use when connecting to the database" + } + }, + "required": [ + "dbname", + "hostname", + "username", + "password" + ], + "additionalProperties": false + } + +Suppose that the ``PostgreSQLStreamFlowPlugin`` class is defined in a ``plugin.py`` file, which is part of a ``streamflow_postgresql`` module. Then, the ``pyproject.toml`` file will contain the following declaration: + +.. code-block:: toml + + [project.entry-points] + "unito.streamflow.plugin" = {"unito.postgresql" = "streamflow_postgresql.plugin:PostgreSQLStreamFlowPlugin"} + +Imagine now that the code described above has been published in a package called ``streamflow-postgresql``. Then, the plugin can be installed with ``pip`` as a normal package: + +.. code-block:: bash + + pip install streamflow-postgresql + +Then, StreamFlow users can instantiate a PostgreSQL database connector for their workflow executions by adding the following lines in the ``streamflow.yml`` file: + +.. code-block:: yaml + + database: + type: unito.postgresql + config: + dbname: "sf" + hostname: "localhost" + username: "sf-user" + password: "1234!" + maxConnections: 50 + +The full source code of the StreamFlow PostgreSQL example plugin is available in on `GitHub `_. \ No newline at end of file diff --git a/docs/source/ext/scheduling.rst b/docs/source/ext/scheduling.rst new file mode 100644 index 000000000..81758116c --- /dev/null +++ b/docs/source/ext/scheduling.rst @@ -0,0 +1,108 @@ +========== +Scheduling +========== + +StreamFlow lets user implement their scheduling infrastructure. There are two extension points related to scheduling: :ref:`Scheduler ` and :ref:`Policy `. The ``Scheduler`` interface implements all the scheduling infrastructure, including data structures, to store the global current allocation status. The ``Policy`` interface implements a specific placement strategy to map jobs onto available locations. Both interfaces are specified in the ``streamflow.core.scheduling`` module. + +In StreamFlow, the ``Job`` object is the allocation unit. Each workflow step generates zero or more ``Job`` objects sent to the scheduling infrastructure for placement. + +.. code-block:: python + + class Job: + def __init__( + self, + name: str, + workflow_id: int, + inputs: MutableMapping[str, Token], + input_directory: str | None, + output_directory: str | None, + tmp_directory: str | None, + ): + ... + +In practice, a ``Job`` is a data structure containing a unique ``name``, the ``workflow_id`` pointing to the workflow it belongs to, a dictionary of ``inputs`` containing the input data needed for execution, and three folder paths pointing to a potentially remote filesystem: ``input_directory``, ``output_directory``, ``tmp_directory``. Since the actual paths depend on the chosen execution location, these parameters are not specified before the scheduling phase. + +Scheduler +========= + +The ``Scheduler`` interface contains three abstract methods: ``schedule``, ``notify_status``, and ``close``. + +.. code-block:: python + + async def schedule( + self, job: Job, binding_config: BindingConfig, hardware_requirement: Hardware + ) -> None: + ... + + async def notify_status( + self, job_name: str, status: Status + ) -> None: + ... + + async def close( + self + ) -> None: + ... + +The ``schedule`` method tries to allocate one or more available locations for a new ``Job`` object. It receives three input parameters: a new ``Job`` object to be allocated, a ``BindingConfig`` object containing the list of potential allocation targets for the ``Job`` and a list of :ref:`BindingFilter ` objects, and a ``HardwareRequirement`` object specifying the resource requirements of the ``Job``. Resource requirements are extracted automatically from the workflow specification, e.g., `CWL `_ files. Conversely, the ``BindingFilter`` object derives from the :ref:`StreamFlow file `. + +The ``notify_status`` method is called whenever a ``Job`` object changes its status, e.g., when it starts, completes, or fails. It receives two input parameters, the name of an existing ``Job`` and its new ``Status``, and returns nothing. When a ``Job`` reaches a final status (i.e., ``FAILED``, ``COMPLETED``, or ``CANCELLED``), its related locations are marked as available, and the ``Scheduler`` starts a new scheduling attempt. + +The ``close`` method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object's lifetime, e.g., network or database connections. + +Implementations +--------------- + +======= ================================================ +Type Class +======= ================================================ +default streamflow.scheduling.scheduler.DefaultScheduler +======= ================================================ + +In the ``DefaultScheduler`` implementation, scheduling attempts follow a simple First Come, First Served (FCFS) approach. The ``schedule`` method demands the allocation strategy to a ``Policy`` object specified in the StreamFlow file's ``bindings`` section through a ``target`` object's ``policy`` directive. If no available allocation configuration can be found for a given ``Job``, it is queued until the next scheduling attempt. + +As discussed above, a scheduling attempt occurs whenever a ``Job`` reaches a final state. Plus, to account for dynamic resource creation and deletion in remote execution environments (e.g., through the Kubernetes `HorizontalPodAutoscaler `_) the ``DefaultScheduler`` can automatically perform a scheduling attempt for each queued ``Job`` at regular intervals. The duration of such intervals can be configured through the ``retry_delay`` parameter. A value of ``0`` (the default) turns off this behaviour. + +.. jsonschema:: ../../../streamflow/scheduling/schemas/scheduler.json + +Policy +====== + +The ``Policy`` interface contains a single method ``get_location``, which returns the ``Location`` chosen for placement or ``None`` if there is no available location. + +.. code-block:: python + + async def get_location( + self, + context: StreamFlowContext, + job: Job, + hardware_requirement: Hardware, + available_locations: MutableMapping[str, AvailableLocation], + jobs: MutableMapping[str, JobAllocation], + locations: MutableMapping[str, MutableMapping[str, LocationAllocation]], + ) -> Location | None: + ... + +The ``get_location`` method receives much information about the current execution context, enabling it to cover a broad class of potential scheduling strategies. In particular, the ``context`` parameter can query all the StreamFlow's relevant data structures, such as the :ref:`Database `, the :ref:`DataManager `, and the :ref:`DeploymentManager `. + +The ``Job`` parameter contains the ``Job`` object to be allocated, and the ``hardware_requirement`` parameter is a ``HardwareRequirement`` object specifying the ``Job``'s resource requirements. The ``available_locations`` parameter contains the list of locations available for placement in the target deployment. They are obtained by calling the ``get_available_locations`` method of the related :ref:`Connector ` object. + +The ``jobs`` and ``locations`` parameters describe the current status of the workflow execution. The ``jobs`` parameter is a dictionary of ``JobAllocation`` objects, containing information about all the previously allocated ``Job`` objects, indexed by their unique name. Each ``JobAllocation`` structure contains the ``Job`` name, its target, the list of locations associated with the ``Job`` execution, the current ``Status`` of the ``Job``, and the hardware resources allocated for its execution on each selected location. + +The ``locations`` parameter is the set of locations allocated to at least one ``Job`` in the past, indexed by their deployment and unique name. Each ``LocationAllocation`` object contains the location name, the name of its deployment, and the list of ``Job`` objects allocated to it, identified by their unique name. + +Implementations +--------------- + +============= ============================================================= +Type Class +============= ============================================================= +data_locality streamflow.scheduling.policy.data_locality.DataLocalityPolicy +============= ============================================================= + +The ``DataLocalityPolicy`` is the default scheduling policy in StreamFlow. The adopted strategy is the following: + +1. File input tokens are sorted by weight in descending order; +2. All the locations containing the related files are retrieved from the :ref:`DataManager` for each token. If data are already present in one of the available locations, that location is chosen for placement; +3. If data-driven allocation is not possible, one location is randomly picked up from the remaining ones; +4. If there are no available locations, return ``None`` (and queue the ``Job``). diff --git a/docs/source/architecture.rst b/docs/source/guide/architecture.rst similarity index 73% rename from docs/source/architecture.rst rename to docs/source/guide/architecture.rst index 51efff6d6..5cacf6f6f 100644 --- a/docs/source/architecture.rst +++ b/docs/source/guide/architecture.rst @@ -2,7 +2,7 @@ Architecture ============ -.. image:: images/streamflow-model.png +.. image:: ../images/streamflow-model.png :alt: StreamFlow logical stack :width: 70% :align: center @@ -26,4 +26,15 @@ To provide enough flexibility, StreamFlow adopts a three-layered hierarchical re * A **deployment** is an entire multi-agent infrastructure and constitutes the *unit of deployment*, i.e., all its components are always co-allocated while executing a step. * A **service** is a single agent type in a deployment and constitutes the *unit of binding*, i.e., each step of a workflow can be offloaded to a single service for execution. -* A **location** is a single instance of a potentially replicated service and constitutes the *unit of scheduling*, i.e., each step of a workflow is offloaded to a configurable number of service locations to be processed. \ No newline at end of file +* A **location** is a single instance of a potentially replicated service and constitutes the *unit of scheduling*, i.e., each step of a workflow is offloaded to a configurable number of service locations to be processed. + +Workflow operations +=================== + +You need three different components to run a hybrid workflow with StreamFlow: + +* A :ref:`workflow description `, i.e. a representation of your application as a graph. +* One or more :ref:`deployment descriptions `, i.e. infrastructure-as-code representations of your execution environments. +* A :ref:`StreamFlow file ` to bind each step of your workflow with the most suitable execution environment. + +StreamFlow will automatically take care of all the secondary aspects, like checkpointing, fault-tolerance, data movements, etc. \ No newline at end of file diff --git a/docs/source/guide/bind.rst b/docs/source/guide/bind.rst new file mode 100644 index 000000000..41d5a48e5 --- /dev/null +++ b/docs/source/guide/bind.rst @@ -0,0 +1,57 @@ +=================== +Put it all together +=================== + +The entrypoint of each StreamFlow execution is a YAML file, conventionally called ``streamflow.yml``. The role of such file is to link each task in a workflow with the service that should execute it. + +A valid StreamFlow file contains the ``version`` number (currently ``v1.0``) and two main sections: ``workflows`` and ``deployments``. The ``workflows`` section consists of a dictionary with uniquely named workflows to be executed in the current run, while the ``deployments`` section contains a dictionary of uniquely named deployment specifications. + +Describing deployments +---------------------- + +Each deployment entry contains two main sections. The ``type`` field identifies which ``Connector`` implementation should be used for its creation, destruction and management. It should refer to one of the StreamFlow connectors described :ref:`here `. The ``config`` field instead contains a dictionary of configuration parameters which are specific to each ``Connector`` class. + +Describing workflows +-------------------- + +Each workflow entry contains three main sections. The ``type`` field identifies which language has been used to describe it (currently the only supported value is ``cwl``), the ``config`` field includes the paths to the files containing such description, and the ``bindings`` section is a list of step-deployment associations that specifies where the execution of a specific step should be offloaded. + +In particular, CWL workflows ``config`` contain a mandatory ``file`` entry that points to the workflow description file (usually a ``*.cwl`` file similar to the example reported :ref:`here `) and an optional ``settings`` entry that points to a secondary file, containing the initial inputs of the workflow. + +Binding steps and deployments +----------------------------- + +Each entry in the ``bindings`` contains a ``step`` directive referring to a specific step in the workflow, and a ``target`` directive referring to a deployment entry in the ``deployments`` section of the StreamFlow file. + +Each step can refer to either a single command or a nested sub-workflow. Steps are uniquely identified by means of a Posix-like path, where each simple task is mapped to a file and each sub-workflow is mapped to a folder. In partiuclar, the most external workflow description is always mapped to the root folder ``/``. Considering the example reported :ref:`here `, you should specify ``/compile`` in the ``step`` directive to identify the ``compile`` step, or ``/`` to identify the entire workflow. + +The ``target`` directive binds the step with a specific service in a StreamFlow deployment. As discussed in the :doc:`architecture section `, complex deployments can contain multiple services, which represent the unit of binding in StreamFlow. The best way to identify services in a deployment strictly depends on the deployment specification itself. + +For example, in DockerCompose it is quite straightforward to uniquely identify each service by using its key in the ``services`` dictionary. Conversely, in Kubernetes we explicitly require users to label containers in a Pod with a unique identifier through the ``name`` attribute, in order to unambiguously identify them at deploy time. + +Simpler deployments like single Docker or Singularity containers do not need a service layer, since the deployment contains a single service that is automatically uniquely identified. + +Example +------- + +The following snippet contains an example of a minimal ``streamflow.yml`` file, connecting the ``compile`` step of :ref:`this ` workflow with an ``openjdk`` Docker container. + +.. code-block:: yaml + + version: v1.0 + workflows: + extract-and-compile: + type: cwl + config: + file: main.cwl + settings: config.yml + bindings: + - step: /compile + target: + deployment: docker-openjdk + + deployments: + docker-openjdk: + type: docker + config: + image: openjdk:9.0.1-11-slim \ No newline at end of file diff --git a/docs/source/guide/cwl.rst b/docs/source/guide/cwl.rst new file mode 100644 index 000000000..ef2f1eef6 --- /dev/null +++ b/docs/source/guide/cwl.rst @@ -0,0 +1,65 @@ +=================== +Write your workflow +=================== + +StreamFlow relies on the `Common Workflow Language `_ (CWL) standard to describe workflows. In particular, it supports version ``v1.2`` of the standard, which introduces conditional execution of workflow steps. + +The reader is referred to the `official CWL documentation `_ to learn how the workflow description language works, as StreamFlow does not introduce any modification to the original specification. + +.. note:: + StreamFlow supports all the features required by the CWL standard conformance, and nearly all optional features, for versions ``v1.0``, ``v1.1``, and ``v1.2``. For a complete overview of CWL conformance status, look :ref:`here `. + +The following snippet contain a simple example of CWL workflow, which extracts a Java source file from a tar archive and compiles it. + +.. code-block:: yaml + + cwlVersion: v1.2 + class: Workflow + inputs: + tarball: File + name_of_file_to_extract: string + + outputs: + compiled_class: + type: File + outputSource: compile/classfile + + steps: + untar: + run: + class: CommandLineTool + baseCommand: [tar, --extract] + inputs: + tarfile: + type: File + inputBinding: + prefix: --file + extractfile: string + outputs: + extracted_file: + type: File + outputBinding: + glob: $(inputs.extractfile) + in: + tarfile: tarball + extractfile: name_of_file_to_extract + out: [extracted_file] + + compile: + run: + class: CommandLineTool + baseCommand: javac + arguments: ["-d", $(runtime.outdir)] + inputs: + src: + type: File + inputBinding: + position: 1 + outputs: + classfile: + type: File + outputBinding: + glob: "*.class" + in: + src: untar/extracted_file + out: [classfile] \ No newline at end of file diff --git a/docs/source/guide/deployments.rst b/docs/source/guide/deployments.rst new file mode 100644 index 000000000..8ed6362ba --- /dev/null +++ b/docs/source/guide/deployments.rst @@ -0,0 +1,18 @@ +======================= +Import your environment +======================= + +StreamFlow relies on external specifications and tools to describe and orchestrate a remote execution environment. For example, a Kubernetes-based deployment can be described in Helm, while a resource reservation request on an HPC facility can be specified with Slurm or PBS files. + +This feature allows users to stick with the technologies they already know, or at least with production-grade tools that are solid, maintained and well-documented. Moreover, it adheres to the `infrastructure-as-code `_ principle, making execution environments easily portable and self-documented. + +The lifecycle management of each StreamFlow deployment is demanded to a specific implementation of the ``Connector`` interface. Connectors provided by default in the StreamFlow codebase are reported :ref:`here `, but users can add new connectors to the list by simply creating their implementation of the ``Connector`` interface. + +The following snippet contains a simple example of Docker deployment named ``docker-openjdk``, which instantiates a container from the ``openjdk:9.0.1-11-slim`` image. At runtime, StreamFlow creates a :ref:`DockerConnector ` instance to manage the container lifecycle. + +.. code-block:: yaml + + docker-openjdk: + type: docker + config: + image: openjdk:9.0.1-11-slim diff --git a/docs/source/guide/inspect.rst b/docs/source/guide/inspect.rst new file mode 100644 index 000000000..de51dba9b --- /dev/null +++ b/docs/source/guide/inspect.rst @@ -0,0 +1,65 @@ +===================== +Inspect workflow runs +===================== + +The StreamFlow Command Line Interface (CLI) offers some feature to inspect workflow runs and collect metadata from them. It is possible to :ref:`list ` past workflow executions, retrieving generic metadata such as execution time and completion status. It is also possible to generate a :ref:`report ` of a specific execution. Finally, it is possible to generate a :ref:`provenance archive ` for a given workflow, ready to be shared and published. + +List executed workflows +======================= + +The history of workflow executions initiated by the user can be printed using the following subcommand: + +.. code-block:: bash + + streamflow list + +The resulting table will contain, for each workflow name, the workflow type and the number of executions associated with that name. For example: + +=================== ==== ========== +NAME TYPE EXECUTIONS +=================== ==== ========== +my-workflow-example cwl 2 +=================== ==== ========== + +To obtain more details related to the different runs of the ```` workflows, i.e., start time, end time, and final status, use the following subcommand: + +.. code-block:: bash + + streamflow list + +For example: + +================================ ================================ ========== +START_TIME END_TIME STATUS +================================ ================================ ========== +2023-03-14T10:44:11.304081+00:00 2023-03-14T10:44:18.345231+00:00 FAILED +2023-03-14T10:45:28.305321+00:00 2023-03-14T10:46:21.274293+00:00 COMPLETED +================================ ================================ ========== + +Generate a report +================= + +To generate a timeline report of a workflow execution, use the following subcommand: + +.. code-block:: bash + + streamflow report + +By default, an interactive ``HTML`` report is generated, but users can specify a different format through the ``--format`` option. + +Collect provenance data +======================= + +StreamFlow supports the `Workflow Run RO-Crate `_ provenance format, an `RO-Crate `_ profile for capturing the provenance of an execution of a computational workflow. + +To generate a provenance archive containing the last execution of a given workflow name (see :ref:`above `), use the following command: + +.. code-block:: bash + + streamflow prov + +The ``--all`` option can instead be used to include the whole history of workflow execution inside a single archive. + +The ``--name`` option defines the name of the archive. By default, the archive will take the workflow name as basename and ``.crate.zip`` as extension. + +The ``--outdir`` option states in which location the archive will be placed (by default, it will be created in the current directory). \ No newline at end of file diff --git a/docs/source/guide/install.rst b/docs/source/guide/install.rst new file mode 100644 index 000000000..c2bbd1c0f --- /dev/null +++ b/docs/source/guide/install.rst @@ -0,0 +1,23 @@ +======= +Install +======= + +You can install StreamFlow as a Python package with ``pip``, run it in a `Docker `_ container or deploy it on `Kubernetes `_ with `Helm `_. + +Pip +=== + +The StreamFlow module is available on `PyPI `_, so you can install it using the following command:: + + pip install streamflow + +Please note that StreamFlow requires ``python >= 3.8`` to be installed on the system. Then you can execute your workflows through the StreamFlow CLI:: + + streamflow /path/to/streamflow.yml + +Docker +====== + +StreamFlow Docker images are available on `Docker Hub `_. To download the latest StreamFlow image, you can use the following command:: + + docker pull alphaunito/streamflow:latest diff --git a/docs/source/guide/run.rst b/docs/source/guide/run.rst new file mode 100644 index 000000000..205a09f99 --- /dev/null +++ b/docs/source/guide/run.rst @@ -0,0 +1,65 @@ +================= +Run your workflow +================= + +To run a workflow with the StreamFlow CLI, simply use the following command: + +.. code-block:: bash + + streamflow run /path/to/streamflow.yml + +.. note:: + For CWL workflows, StreamFlow also supports the ``cwl-runner`` interface (more details :ref:`here `). + +The ``--outdir`` option specifies where StreamFlow must store the workflow results and the execution metadata. Metadata are collected and managed by the StreamFlow ``Database`` implementtion (see :ref:`here `). By default, StreamFlow uses the current directory as its output folder. + +The ``--name`` option allows to specify a workflow name for the current execution. Note that multiple execution can have the same name, meaning that they are multiple instances of the same workflow. If a ``--name`` is not explicitly provided, StreamFlow will randomly generate a unique name for the current execution. + +The ``--color`` option allows to print log preamble with colors related to the logging level, which can be useful for live demos and faster log inspections. + +Run on Docker +============= + +The command below gives an example of how to execute a StreamFlow workflow in a Docker container: + +.. code-block:: bash + + docker run -d \ + --mount type=bind,source="$(pwd)"/my-project,target=/streamflow/project \ + --mount type=bind,source="$(pwd)"/results,target=/streamflow/results \ + --mount type=bind,source="$(pwd)"/tmp,target=/tmp/streamflow \ + alphaunito/streamflow \ + streamflow run /streamflow/project/streamflow.yml + +.. note:: + A StreamFlow project, containing a ``streamflow.yml`` file and all the other relevant dependencies (e.g. a CWL description of the workflow steps and a Helm description of the execution environment) needs to be mounted as a volume inside the container, for example in the ``/streamflow/project`` folder. + + By default, workflow outputs will be stored in the ``/streamflow/results`` folder. Therefore, it is necessary to mount such location as a volume in order to persist the results. + + StreamFlow will save all its temporary files inside the ``/tmp/streamflow`` location. For debugging purposes, or in order to improve I/O performances in case of huge files, it could be useful to mount also such location as a volume. + + By default, the StreamFlow :ref:`Database ` stores workflow metadata in the ``${HOME}/.streamflow`` folder. Mounting this floder as a volume preserve these metadata for further inspection (see :ref:`here `). + +.. warning:: + All the container-based connectors (i.e., ``DockerConnector``, ``DockerComposeConnector`` and ``SingularityConnector``) are not supported from inside a Docker container, as running nested containers is a non-trivial task. + +Run on Kubernetes +================= + +It is also possible to execute the StreamFlow container as a `Job `_ in Kubernetes, with the same characteristics and restrictions discussed for the :ref:`Docker ` case. A Helm template of a StreamFlow Job can be found :repo:`here `. + +In this case, the StreamFlow ``HelmConnector`` is able to deploy Helm charts directly on the parent cluster, relying on `ServiceAccount `_ credentials. In order to do that, the ``inCluster`` option must be set to ``true`` for each involved module on the ``streamflow.yml`` file + +.. code-block:: yaml + + deployments: + helm-deployment: + type: helm + config: + inCluster: true + ... + +A Helm template of a StreamFlow Job can be found `here `_. + +.. warning:: + In case `RBAC `_ is active on the Kubernetes cluster, a proper RoleBinding must be attached to the ServiceAccount object, in order to give StreamFlow the permissions to manage deployments of pods and executions of tasks. \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index b325cf3a2..f48e0adfb 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -9,7 +9,7 @@ It has been designed around two main principles: 1. Allowing the execution of tasks in **multi-container environments**, in order to support concurrent execution of multiple communicating tasks in a multi-agent ecosystem. 2. Relaxing the requirement of a single shared data space, in order to allow for **hybrid workflow** executions on top of multi-cloud or hybrid cloud/HPC infrastructures. -StreamFlow source code is available on `GitHub `_ under the LGPLv3 license. If you want to cite StreamFlow, please refer to this article: +StreamFlow source code is available on :repo:`GitHub <.>` under the LGPLv3 license. If you want to cite StreamFlow, please refer to this article: .. code-block:: text @@ -38,9 +38,21 @@ For LaTeX users, the following BibTeX entry can be used: :caption: Getting Started :hidden: - install.rst - architecture.rst - operations.rst + guide/install.rst + guide/architecture.rst + guide/cwl.rst + guide/deployments.rst + guide/bind.rst + guide/run.rst + guide/inspect.rst + +.. toctree:: + :caption: Advanced Features + :hidden: + + advanced/multiple-targets.rst + advanced/port-targets.rst + advanced/stacked-locations.rst .. toctree:: :caption: CWL Standard @@ -48,6 +60,21 @@ For LaTeX users, the following BibTeX entry can be used: cwl/cwl-conformance.rst cwl/cwl-runner.rst + cwl/docker-requirement.rst + +.. toctree:: + :caption: Extension Points + :hidden: + + ext/plugins.rst + ext/binding-filter.rst + ext/cwl-docker-translator.rst + ext/connector.rst + ext/data-manager.rst + ext/database.rst + ext/deployment-manager.rst + ext/fault-tolerance.rst + ext/scheduling.rst .. toctree:: :caption: Connectors @@ -57,8 +84,18 @@ For LaTeX users, the following BibTeX entry can be used: connector/docker-compose.rst connector/flux.rst connector/helm3.rst + connector/kubernetes.rst connector/occam.rst connector/pbs.rst + connector/queue-manager.rst connector/singularity.rst connector/slurm.rst connector/ssh.rst + +.. toctree:: + :caption: CWL Docker Translators + :hidden: + + cwl/docker/docker.rst + cwl/docker/kubernetes.rst + cwl/docker/singularity.rst \ No newline at end of file diff --git a/docs/source/install.rst b/docs/source/install.rst deleted file mode 100644 index eda7e381d..000000000 --- a/docs/source/install.rst +++ /dev/null @@ -1,54 +0,0 @@ -======= -Install -======= - -You can install StreamFlow as a Python package with ``pip``, run it in a `Docker `_ container or deploy it on `Kubernetes `_ with `Helm `_. - -Pip -=== - -The StreamFlow module is available on `PyPI `_, so you can install it using the following command:: - - pip install streamflow - -Please note that StreamFlow requires ``python >= 3.8`` to be installed on the system. Then you can execute your workflows through the StreamFlow CLI:: - - streamflow /path/to/streamflow.yml - -Docker -====== - -StreamFlow Docker images are available on `Docker Hub `_. To download the latest StreamFlow image, you can use the following command:: - - docker pull alphaunito/streamflow:latest - -The command below gives an example of how to execute a StreamFlow workflow in a Docker container: - -.. code-block:: bash - - docker run -d \ - --mount type=bind,source="$(pwd)"/my-project,target=/streamflow/project \ - --mount type=bind,source="$(pwd)"/results,target=/streamflow/results \ - --mount type=bind,source="$(pwd)"/tmp,target=/tmp/streamflow \ - alphaunito/streamflow \ - /streamflow/project/streamflow.yml - -.. note:: - A StreamFlow project, containing a ``streamflow.yml`` file and all the other relevant dependencies (e.g. a CWL description of the workflow steps and a Helm description of the execution environment) need to be mounted as a volume inside the container, for example in the ``/streamflow/project`` folder. - - By default, workflow outputs will be stored in the ``/streamflow/results`` folder. Therefore, it is necessary to mount such location as a volume in order to persist the results. - - StreamFlow will save all its temporary files inside the ``/tmp/streamflow`` location. For debugging purposes, or in order to improve I/O performances in case of huge files, it could be useful to mount also such location as a volume. - -.. warning:: - All the container-based connectors (i.e., ``DockerConnector``, ``DockerComposeConnector`` and ``SingularityConnector``) are not supported from inside a Docker container, as running nested containers is a non-trivial task. - -Kubernetes -========== - -It is also possible to execute the StreamFlow container as a `Job `_ in Kubernetes, with the same characteristics and restrictions discussed for the :ref:`Docker ` case. A Helm template of a StreamfFlow Job can be found :repo:`here `. - -In this case, the StreamFlow ``HelmConnector`` is able to deploy Helm charts directly on the parent cluster, relying on `ServiceAccount `_ credentials. - -.. warning:: - In case `RBAC `_ is active on the Kubernetes cluster, a proper RoleBinding must be attached to the ServiceAccount object, in order to give StreamFlow the permissions to manage deployments of pods and executions of tasks. \ No newline at end of file diff --git a/docs/source/operations.rst b/docs/source/operations.rst deleted file mode 100644 index b14a3ab2c..000000000 --- a/docs/source/operations.rst +++ /dev/null @@ -1,172 +0,0 @@ -========== -Operations -========== - -As shown in the :doc:`architecture ` section, you need three different components to run a hybrid workflow with StreamFlow: - -* A :ref:`workflow description `, i.e. a representation of your application as a graph. -* One or more :ref:`deployment descriptions `, i.e. infrastructure-as-code representations of your execution environments. -* A :ref:`StreamFlow file ` to bind each step of your workflow with the most suitable execution environment. - -StreamFlow will automatically take care of all the secondary aspects, like checkpointing, fault-tolerance, data movements, etc. - -Write your workflow -=================== - -StreamFlow relies on the `Common Workflow Language `_ (CWL) standard to describe workflows. In particular, it supports version ``v1.2`` of the standard, which introduces conditional execution of workflow steps. - -The reader is referred to the `official CWL documentation `_ to learn how the workflow description language works, as StreamFlow does not introduce any modification to the original specification. - -.. note:: - StreamFlow supports all the features required by the CWL standard conformance, and nearly all optional features. For a complete overview of CWL conformance status, look :ref:`here `. - -The following snippet contain a simple example of CWL workflow, which extracts a Java source file from a tar archive and compiles it. - -.. code-block:: yaml - - cwlVersion: v1.2 - class: Workflow - inputs: - tarball: File - name_of_file_to_extract: string - - outputs: - compiled_class: - type: File - outputSource: compile/classfile - - steps: - untar: - run: - class: CommandLineTool - baseCommand: [tar, --extract] - inputs: - tarfile: - type: File - inputBinding: - prefix: --file - outputs: - example_out: - type: File - outputBinding: - glob: hello.txt - in: - tarfile: tarball - extractfile: name_of_file_to_extract - out: [extracted_file] - - compile: - run: - class: CommandLineTool - baseCommand: javac - arguments: ["-d", $(runtime.outdir)] - inputs: - src: - type: File - inputBinding: - position: 1 - outputs: - classfile: - type: File - outputBinding: - glob: "*.class" - in: - src: untar/extracted_file - out: [classfile] - -Import your environment -======================= - -StreamFlow relies on external specification and tools to describe and orchestrate remote execution environment. As an example, a Kubernetes-based deployment can be described in Helm, while a resource reservation request on a HPC facility can be specified with either a Slurm or PBS files. - -This feature allows users to stick with the technologies they already know, or at least with production grade tools that are solid, maintained and well documented. Moreover, it adheres to the `infrastructure-as-code `_ principle, making execution environments easily portable and self-documented. - -The lifecycle management of each StreamFlow deployment is demanded to a specific implementation of the ``Connector`` interface. Connectors provided by default in the StreamFlow codebase are reported in the table below, but users can add new connectors to the list by simply creating their own implementation of the ``Connector`` interface. - -======================================================= ================================================================ -Name Class -======================================================= ================================================================ -:ref:`docker ` streamflow.deployment.connector.docker.DockerConnector -:ref:`docker-compose ` streamflow.deployment.connector.docker.DockerComposeConnector -:ref:`helm ` streamflow.deployment.connector.kubernetes.Helm3Connector -:ref:`helm3 ` streamflow.deployment.connector.kubernetes.Helm3Connector -:ref:`occam ` streamflow.deployment.connector.occam.OccamConnector -:ref:`pbs ` streamflow.deployment.connector.queue_manager.PBSConnector -:ref:`singularity ` streamflow.deployment.connector.singularity.SingularityConnector -:ref:`slurm ` streamflow.deployment.connector.queue_manager.SlurmConnector -:ref:`ssh ` streamflow.deployment.connector.ssh.SSHConnector -======================================================= ================================================================ - -Put it all together -=================== - -The entrypoint of each StreamFlow execution is a YAML file, conventionally called ``streamflow.yml``. The role of such file is to link each task in a workflow with the service that should execute it. - -A valid StreamFlow file contains the ``version`` number (currently ``v1.0``) and two main sections: ``workflows`` and ``deployments``. The ``workflows`` section consists of a dictionary with uniquely named workflows to be executed in the current run, while the ``deployments`` section contains a dictionary of uniquely named deployment specifications. - -Describing deployments ------------------ - -Each deployment entry contains two main sections. The ``type`` field identifies which ``Connector`` implementation should be used for its creation, destruction and management. It should refer to one of the StreamFlow connectors described :ref:`above `. The ``config`` field instead contains a dictionary of configuration parameters which are specific to each ``Connector`` class. - -Describing workflows --------------------- - -Each workflow entry contains three main sections. The ``type`` field identifies which language has been used to describe it (currently the only supported value is ``cwl``), the ``config`` field includes the paths to the files containing such description, and the ``bindings`` section is a list of step-deployment associations that specifies where the execution of a specific step should be offloaded. - -In particular, CWL workflows ``config`` contain a mandatory ``file`` entry that points to the workflow description file (usually a ``*.cwl`` file similar to the example reported :ref:`above `) and an optional ``settings`` entry that points to a secondary file, containing the initial inputs of the workflow. - -Binding steps and deployments ------------------------------ - -Each entry in the ``bindings`` contains a ``step`` directive referring to a specific step in the workflow, and a ``target`` directive referring to a deployment entry in the ``deployments`` section of the StreamFlow file. - -Each step can refer to either a single command or a nested sub-workflow. Steps are uniquely identified by means of a Posix-like path, where each simple task is mapped to a file and each sub-workflow is mapped to a folder. In partiuclar, the most external workflow description is always mapped to the root folder ``/``. Considering the example reported :ref:`above `, you should specify ``/compile`` in the ``step`` directive to identify the ``compile`` step, or ``/`` to identify the entire workflow. - -The ``target`` directive binds the step with a specific service in a StreamFlow deployment. As discussed in the :doc:`architecture section `, complex deployments can contain multiple services, which represent the unit of binding in StreamFlow. The best way to identify services in a deployment strictly depends on the deployment specification itself. For example, in DockerCompose it is quite straightforward to uniquely identify each service by using its key in the ``services`` dictionary. Conversely, in Kubernetes we explicitly require users to label containers in a Pod with a unique identifier through the ``name`` attribute, in order to unambiguously identify them at deploy time. - -Simpler deployments like single Docker or Singularity containers do not need a service layer, since the deployment contains a single service that is automatically uniquely identified. - -Example -------- - -The following snippet contains an example of a minimal ``streamflow.yml`` file, connecting the ``compile`` step of the previous workflow with an ``openjdk`` Docker container. - -.. code-block:: yaml - - version: v1.0 - workflows: - extract-and-compile: - type: cwl - config: - file: main.cwl - settings: config.yml - bindings: - - step: /compile - target: - deployment: docker-openjdk - - deployments: - docker-openjdk: - type: docker - config: - image: openjdk:9.0.1-11-slim - -Run your workflow -================= - -To run a workflow with the StreamFlow CLI, simply use the following command: - -.. code-block:: bash - - streamflow run /path/to/streamflow.yml - -.. note:: - For CWL workflows, StreamFlow also supports the ``cwl-runner`` interface (more details :ref:`here `). - -The ``--outdir`` option specifies where StreamFlow must store the workflow results and the execution metadata, collected in a ``.streamflow`` folder. By default, StreamFlow uses the current directory as its output folder. - -Generate a report -================= - -The ``streamflow report`` subcommand can generate a timeline report of workflow execution. A user must execute it in the parent directory of the ``.streamflow`` folder described :ref:`above `. By default, an interactive ``HTML`` report is generated, but users can specify a different format through the ``--format`` option. \ No newline at end of file diff --git a/streamflow/core/data.py b/streamflow/core/data.py index 021a50438..282d65b87 100644 --- a/streamflow/core/data.py +++ b/streamflow/core/data.py @@ -60,7 +60,7 @@ def __init__(self, context: StreamFlowContext): self.context: StreamFlowContext = context @abstractmethod - async def close(self): + async def close(self) -> None: ... @abstractmethod @@ -68,8 +68,8 @@ def get_data_locations( self, path: str, deployment: str | None = None, - location: str | None = None, - location_type: DataType | None = None, + location_name: str | None = None, + data_type: DataType | None = None, ) -> MutableSequence[DataLocation]: ... @@ -102,12 +102,12 @@ def register_relation( @abstractmethod async def transfer_data( self, - src_locations: MutableSequence[Location], + src_location: Location, src_path: str, dst_locations: MutableSequence[Location], dst_path: str, writable: bool = False, - ): + ) -> None: ... diff --git a/streamflow/core/deployment.py b/streamflow/core/deployment.py index 42e41098f..98a09f957 100644 --- a/streamflow/core/deployment.py +++ b/streamflow/core/deployment.py @@ -6,7 +6,6 @@ import posixpath import tempfile from abc import abstractmethod -from enum import Enum from typing import TYPE_CHECKING, Type, cast from streamflow.core import utils @@ -72,14 +71,33 @@ def __init__(self, deployment_name: str, config_dir: str): self.config_dir: str = config_dir @abstractmethod - async def copy( + async def copy_local_to_remote( self, src: str, dst: str, locations: MutableSequence[Location], - kind: ConnectorCopyKind, + read_only: bool = False, + ) -> None: + ... + + @abstractmethod + async def copy_remote_to_local( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + read_only: bool = False, + ) -> None: + ... + + @abstractmethod + async def copy_remote_to_remote( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + source_location: Location, source_connector: Connector | None = None, - source_location: Location | None = None, read_only: bool = False, ) -> None: ... @@ -119,22 +137,16 @@ async def undeploy(self, external: bool) -> None: ... -class ConnectorCopyKind(Enum): - LOCAL_TO_REMOTE = 1 - REMOTE_TO_LOCAL = 2 - REMOTE_TO_REMOTE = 3 - - class DeploymentManager(SchemaEntity): def __init__(self, context: StreamFlowContext) -> None: self.context: StreamFlowContext = context @abstractmethod - async def close(self): + async def close(self) -> None: ... @abstractmethod - async def deploy(self, deployment_config: DeploymentConfig): + async def deploy(self, deployment_config: DeploymentConfig) -> None: ... @abstractmethod @@ -142,11 +154,7 @@ def get_connector(self, deployment_name: str) -> Connector | None: ... @abstractmethod - def is_deployed(self, deployment_name: str): - ... - - @abstractmethod - async def undeploy(self, deployment_name: str): + async def undeploy(self, deployment_name: str) -> None: ... @abstractmethod @@ -270,7 +278,7 @@ async def save(self, context: StreamFlowContext) -> None: self.persistent_id = await context.database.add_target( deployment=self.deployment.persistent_id, type=type(self), - params=json.dumps(await self._save_additional_params(context)), + params=await self._save_additional_params(context), locations=self.locations, service=self.service, workdir=self.workdir, diff --git a/streamflow/core/persistence.py b/streamflow/core/persistence.py index e71a2a336..2fc6d5cb6 100644 --- a/streamflow/core/persistence.py +++ b/streamflow/core/persistence.py @@ -115,17 +115,26 @@ async def add_deployment( @abstractmethod async def add_port( - self, name: str, workflow_id: int, type: type[Port], params: str + self, + name: str, + workflow_id: int, + type: type[Port], + params: MutableMapping[str, Any], ) -> int: ... @abstractmethod - async def add_provenance(self, inputs: MutableSequence[int], token: int): + async def add_provenance(self, inputs: MutableSequence[int], token: int) -> None: ... @abstractmethod async def add_step( - self, name: str, workflow_id: int, status: int, type: type[Step], params: str + self, + name: str, + workflow_id: int, + status: int, + type: type[Step], + params: MutableMapping[str, Any], ) -> int: ... @@ -134,7 +143,7 @@ async def add_target( self, deployment: int, type: type[Target], - params: str, + params: MutableMapping[str, Any], locations: int = 1, service: str | None = None, workdir: str | None = None, @@ -148,11 +157,13 @@ async def add_token( ... @abstractmethod - async def add_workflow(self, name: str, params: str, status: int, type: str) -> int: + async def add_workflow( + self, name: str, params: MutableMapping[str, Any], status: int, type: str + ) -> int: ... @abstractmethod - async def close(self): + async def close(self) -> None: ... @abstractmethod @@ -224,25 +235,25 @@ async def get_workflow(self, workflow_id: int) -> MutableMapping[str, Any]: ... @abstractmethod - async def get_workflows_by_name( - self, workflow_name: str, last_only: bool = False + async def get_workflow_ports( + self, workflow_id: int ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod - async def get_workflow_ports( + async def get_workflow_steps( self, workflow_id: int ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod - async def get_workflow_steps( - self, workflow_id: int + async def get_workflows_by_name( + self, workflow_name: str, last_only: bool = False ) -> MutableSequence[MutableMapping[str, Any]]: ... @abstractmethod - async def list_workflows( + async def get_workflows_list( self, name: str | None ) -> MutableSequence[MutableMapping[str, Any]]: ... diff --git a/streamflow/core/scheduling.py b/streamflow/core/scheduling.py index 5923b84a5..60aded54f 100644 --- a/streamflow/core/scheduling.py +++ b/streamflow/core/scheduling.py @@ -129,13 +129,13 @@ def __init__( self, job: str, target: Target, - locations: MutableSequence[AvailableLocation], + locations: MutableSequence[Location], status: Status, hardware: Hardware, ): self.job: str = job self.target: Target = target - self.locations: MutableSequence[AvailableLocation] = locations + self.locations: MutableSequence[Location] = locations self.status: Status = status self.hardware: Hardware = hardware @@ -192,7 +192,6 @@ async def get_location( self, context: StreamFlowContext, job: Job, - deployment: str, hardware_requirement: Hardware, available_locations: MutableMapping[str, AvailableLocation], jobs: MutableMapping[str, JobAllocation], @@ -210,7 +209,7 @@ def __init__(self, context: StreamFlowContext): ] = {} @abstractmethod - async def close(self): + async def close(self) -> None: ... def get_allocation(self, job_name: str) -> JobAllocation | None: @@ -246,7 +245,7 @@ def get_service(self, job_name: str) -> str | None: return allocation.target.service if allocation else None @abstractmethod - async def notify_status(self, job_name: str, status: Status): + async def notify_status(self, job_name: str, status: Status) -> None: ... @abstractmethod diff --git a/streamflow/core/workflow.py b/streamflow/core/workflow.py index 2c7ee1253..3e76398ce 100644 --- a/streamflow/core/workflow.py +++ b/streamflow/core/workflow.py @@ -172,16 +172,16 @@ def __init__( name: str, workflow_id: int, inputs: MutableMapping[str, Token], - input_directory: str, - output_directory: str, - tmp_directory: str, + input_directory: str | None, + output_directory: str | None, + tmp_directory: str | None, ): self.name: str = name self.workflow_id: int = workflow_id self.inputs: MutableMapping[str, Token] = inputs - self.input_directory: str = input_directory - self.output_directory: str = output_directory - self.tmp_directory: str = tmp_directory + self.input_directory: str | None = input_directory + self.output_directory: str | None = output_directory + self.tmp_directory: str | None = tmp_directory @classmethod async def _load( @@ -326,7 +326,7 @@ async def save(self, context: StreamFlowContext) -> None: name=self.name, workflow_id=self.workflow.persistent_id, type=type(self), - params=json.dumps(await self._save_additional_params(context)), + params=await self._save_additional_params(context), ) @@ -472,7 +472,7 @@ async def save(self, context: StreamFlowContext) -> None: workflow_id=self.workflow.persistent_id, status=cast(int, self.status.value), type=type(self), - params=json.dumps(await self._save_additional_params(context)), + params=await self._save_additional_params(context), ) save_tasks = [] for name, port in self.get_input_ports().items(): @@ -716,7 +716,7 @@ async def save(self, context: StreamFlowContext) -> None: if not self.persistent_id: self.persistent_id = await self.context.database.add_workflow( name=self.name, - params=json.dumps(await self._save_additional_params(context)), + params=await self._save_additional_params(context), status=Status.WAITING.value, type=self.type, ) diff --git a/streamflow/cwl/command.py b/streamflow/cwl/command.py index d75f33667..2b93e92a8 100644 --- a/streamflow/cwl/command.py +++ b/streamflow/cwl/command.py @@ -420,7 +420,7 @@ async def _prepare_work_dir( base_path, path_processor.basename(src_path) ) await self.step.workflow.context.data_manager.transfer_data( - src_locations=[selected_location], + src_location=selected_location, src_path=src_path, dst_locations=locations, dst_path=dest_path, diff --git a/streamflow/cwl/step.py b/streamflow/cwl/step.py index 2c70e2aa0..69281aa8c 100644 --- a/streamflow/cwl/step.py +++ b/streamflow/cwl/step.py @@ -440,7 +440,7 @@ async def _update_file_token( ) # Perform and transfer await self.workflow.context.data_manager.transfer_data( - src_locations=[selected_location], + src_location=selected_location, src_path=selected_location.path, dst_locations=dst_locations, dst_path=filepath, @@ -460,7 +460,7 @@ async def _update_file_token( data_locations = self.workflow.context.data_manager.get_data_locations( path=filepath, deployment=dst_connector.deployment_name, - location_type=DataType.SYMBOLIC_LINK, + data_type=DataType.SYMBOLIC_LINK, ) # If the remote location is not a symbolic link, perform remote checksum original_checksum = token_value["checksum"] diff --git a/streamflow/cwl/token.py b/streamflow/cwl/token.py index 0bdf4b7c4..5b2a76023 100644 --- a/streamflow/cwl/token.py +++ b/streamflow/cwl/token.py @@ -16,7 +16,7 @@ async def _get_file_token_weight(context: StreamFlowContext, value: Any): else: if path := utils.get_path_from_token(value): data_locations = context.data_manager.get_data_locations( - path=path, location_type=DataType.PRIMARY + path=path, data_type=DataType.PRIMARY ) if data_locations: location = list(data_locations)[0] @@ -44,7 +44,7 @@ async def _get_file_token_weight(context: StreamFlowContext, value: Any): async def _is_file_token_available(context: StreamFlowContext, value: Any) -> bool: if path := utils.get_path_from_token(value): data_locations = context.data_manager.get_data_locations( - path=path, location_type=DataType.PRIMARY + path=path, data_type=DataType.PRIMARY ) return len(data_locations) != 0 else: diff --git a/streamflow/data/__init__.py b/streamflow/data/__init__.py index 0d342fdf7..7bf7a2a2a 100644 --- a/streamflow/data/__init__.py +++ b/streamflow/data/__init__.py @@ -1,3 +1,3 @@ -from streamflow.data.data_manager import DefaultDataManager +from streamflow.data.manager import DefaultDataManager data_manager_classes = {"default": DefaultDataManager} diff --git a/streamflow/data/data_manager.py b/streamflow/data/manager.py similarity index 89% rename from streamflow/data/data_manager.py rename to streamflow/data/manager.py index 721a4376d..b2b48ac93 100644 --- a/streamflow/data/data_manager.py +++ b/streamflow/data/manager.py @@ -10,7 +10,6 @@ from streamflow.core.data import DataLocation, DataManager, DataType from streamflow.core.deployment import Connector, Location from streamflow.data import remotepath -from streamflow.deployment.connector.base import ConnectorCopyKind from streamflow.deployment.connector.local import LocalConnector from streamflow.deployment.utils import get_path_processor @@ -29,29 +28,26 @@ async def _copy( writable: False, ) -> None: if isinstance(src_connector, LocalConnector): - await dst_connector.copy( + await dst_connector.copy_local_to_remote( src=src, dst=dst, locations=dst_locations, - kind=ConnectorCopyKind.LOCAL_TO_REMOTE, read_only=not writable, ) elif isinstance(dst_connector, LocalConnector): - await src_connector.copy( + await src_connector.copy_remote_to_local( src=src, dst=dst, locations=[src_location], - kind=ConnectorCopyKind.REMOTE_TO_LOCAL, read_only=not writable, ) else: - await dst_connector.copy( + await dst_connector.copy_remote_to_remote( src=src, dst=dst, locations=dst_locations, - kind=ConnectorCopyKind.REMOTE_TO_REMOTE, - source_connector=src_connector, source_location=src_location, + source_connector=src_connector, read_only=not writable, ) @@ -61,14 +57,108 @@ def __init__(self, context: StreamFlowContext): super().__init__(context) self.path_mapper = RemotePathMapper(context) - async def _transfer_from_location( + async def close(self): + pass + + def get_data_locations( + self, + path: str, + deployment: str | None = None, + location_name: str | None = None, + data_type: DataType | None = None, + ) -> MutableSequence[DataLocation]: + data_locations = self.path_mapper.get( + path, data_type, deployment, location_name + ) + data_locations = [ + loc for loc in data_locations if loc.data_type != DataType.INVALID + ] + return data_locations + + @classmethod + def get_schema(cls) -> str: + return pkg_resources.resource_filename( + __name__, os.path.join("schemas", "data_manager.json") + ) + + def get_source_location( + self, path: str, dst_deployment: str + ) -> DataLocation | None: + if data_locations := self.get_data_locations(path=path): + dst_connector = self.context.deployment_manager.get_connector( + dst_deployment + ) + same_connector_locations = { + loc + for loc in data_locations + if loc.deployment == dst_connector.deployment_name + } + if same_connector_locations: + for loc in same_connector_locations: + if loc.data_type == DataType.PRIMARY: + return loc + return list(same_connector_locations)[0] + else: + local_locations = { + loc + for loc in data_locations + if isinstance( + self.context.deployment_manager.get_connector(loc.deployment), + LocalConnector, + ) + } + if local_locations: + for loc in local_locations: + if loc.data_type == DataType.PRIMARY: + return loc + return list(local_locations)[0] + else: + for loc in data_locations: + if loc.data_type == DataType.PRIMARY: + return loc + return list(data_locations)[0] + else: + return None + + def invalidate_location(self, location: Location, path: str) -> None: + self.path_mapper.invalidate_location(location, path) + + def register_path( + self, + location: Location, + path: str, + relpath: str | None = None, + data_type: DataType = DataType.PRIMARY, + ) -> DataLocation: + data_location = DataLocation( + path=path, + relpath=relpath or path, + deployment=location.deployment, + service=location.service, + name=location.name, + data_type=data_type, + available=True, + ) + self.path_mapper.put(path=path, data_location=data_location, recursive=True) + self.context.checkpoint_manager.register(data_location) + return data_location + + def register_relation( + self, src_location: DataLocation, dst_location: DataLocation + ) -> None: + data_locations = self.path_mapper.get(src_location.path) + for data_location in list(data_locations): + self.path_mapper.put(data_location.path, dst_location) + self.path_mapper.put(dst_location.path, data_location) + + async def transfer_data( self, src_location: Location, - src: str, + src_path: str, dst_locations: MutableSequence[Location], - dst: str, - writable: bool, - ): + dst_path: str, + writable: bool = False, + ) -> None: src_connector = self.context.deployment_manager.get_connector( src_location.deployment ) @@ -76,12 +166,12 @@ async def _transfer_from_location( dst_locations[0].deployment ) # Create destination folder - await remotepath.mkdir(dst_connector, dst_locations, str(Path(dst).parent)) + await remotepath.mkdir(dst_connector, dst_locations, str(Path(dst_path).parent)) # Follow symlink for source path - src = await remotepath.follow_symlink( - self.context, src_connector, src_location, src + src_path = await remotepath.follow_symlink( + self.context, src_connector, src_location, src_path ) - primary_locations = self.path_mapper.get(src, DataType.PRIMARY) + primary_locations = self.path_mapper.get(src_path, DataType.PRIMARY) copy_tasks = [] remote_locations = [] data_locations = [] @@ -95,12 +185,12 @@ async def _transfer_from_location( # If yes, perform a symbolic link if possible if not writable: await remotepath.symlink( - dst_connector, dst_location, primary_loc.path, dst + dst_connector, dst_location, primary_loc.path, dst_path ) self.path_mapper.create_and_map( location_type=DataType.SYMBOLIC_LINK, - src_path=src, - dst_path=dst, + src_path=src_path, + dst_path=dst_path, dst_deployment=dst_connector.deployment_name, dst_service=dst_location.service, dst_location=dst_location.name, @@ -116,17 +206,19 @@ async def _transfer_from_location( src=primary_loc.path, dst_connector=dst_connector, dst_locations=[dst_location], - dst=dst, + dst=dst_path, writable=True, ) ) ) data_locations.append( self.path_mapper.put( - path=dst, + path=dst_path, data_location=DataLocation( - path=dst, - relpath=list(self.path_mapper.get(src))[0].relpath, + path=dst_path, + relpath=list(self.path_mapper.get(src_path))[ + 0 + ].relpath, deployment=dst_connector.deployment_name, service=dst_location.service, data_type=DataType.PRIMARY, @@ -143,10 +235,10 @@ async def _transfer_from_location( if writable: data_locations.append( self.path_mapper.put( - path=dst, + path=dst_path, data_location=DataLocation( - path=dst, - relpath=list(self.path_mapper.get(src))[0].relpath, + path=dst_path, + relpath=list(self.path_mapper.get(src_path))[0].relpath, deployment=dst_connector.deployment_name, data_type=DataType.PRIMARY, service=dst_location.service, @@ -159,8 +251,8 @@ async def _transfer_from_location( data_locations.append( self.path_mapper.create_and_map( location_type=DataType.PRIMARY, - src_path=src, - dst_path=dst, + src_path=src_path, + dst_path=dst_path, dst_deployment=dst_connector.deployment_name, dst_service=dst_location.service, dst_location=dst_location.name, @@ -173,10 +265,10 @@ async def _transfer_from_location( _copy( src_connector=src_connector, src_location=src_location, - src=src, + src=src_path, dst_connector=dst_connector, dst_locations=remote_locations, - dst=dst, + dst=dst_path, writable=writable, ) ) @@ -186,118 +278,6 @@ async def _transfer_from_location( for data_location in data_locations: data_location.available.set() - async def close(self): - pass - - def get_data_locations( - self, - path: str, - deployment: str | None = None, - location: str | None = None, - location_type: DataType | None = None, - ) -> MutableSequence[DataLocation]: - data_locations = self.path_mapper.get(path, location_type, deployment, location) - data_locations = [ - loc for loc in data_locations if loc.data_type != DataType.INVALID - ] - return data_locations - - @classmethod - def get_schema(cls) -> str: - return pkg_resources.resource_filename( - __name__, os.path.join("schemas", "data_manager.json") - ) - - def get_source_location( - self, path: str, dst_deployment: str - ) -> DataLocation | None: - if data_locations := self.get_data_locations(path=path): - dst_connector = self.context.deployment_manager.get_connector( - dst_deployment - ) - same_connector_locations = { - loc - for loc in data_locations - if loc.deployment == dst_connector.deployment_name - } - if same_connector_locations: - for loc in same_connector_locations: - if loc.data_type == DataType.PRIMARY: - return loc - return list(same_connector_locations)[0] - else: - local_locations = { - loc - for loc in data_locations - if isinstance( - self.context.deployment_manager.get_connector(loc.deployment), - LocalConnector, - ) - } - if local_locations: - for loc in local_locations: - if loc.data_type == DataType.PRIMARY: - return loc - return list(local_locations)[0] - else: - for loc in data_locations: - if loc.data_type == DataType.PRIMARY: - return loc - return list(data_locations)[0] - else: - return None - - def invalidate_location(self, location: Location, path: str) -> None: - self.path_mapper.invalidate_location(location, path) - - def register_path( - self, - location: Location, - path: str, - relpath: str | None = None, - data_type: DataType = DataType.PRIMARY, - ) -> DataLocation: - data_location = DataLocation( - path=path, - relpath=relpath or path, - deployment=location.deployment, - service=location.service, - name=location.name, - data_type=data_type, - available=True, - ) - self.path_mapper.put(path=path, data_location=data_location, recursive=True) - self.context.checkpoint_manager.register(data_location) - return data_location - - def register_relation( - self, src_location: DataLocation, dst_location: DataLocation - ) -> None: - data_locations = self.path_mapper.get(src_location.path) - for data_location in list(data_locations): - self.path_mapper.put(data_location.path, dst_location) - self.path_mapper.put(dst_location.path, data_location) - - async def transfer_data( - self, - src_locations: MutableSequence[Location], - src_path: str, - dst_locations: MutableSequence[Location], - dst_path: str, - writable: bool = False, - ): - # Get connectors and locations from steps - await asyncio.gather( - *( - asyncio.create_task( - self._transfer_from_location( - src_location, src_path, dst_locations, dst_path, writable - ) - ) - for src_location in src_locations - ) - ) - class RemotePathNode: __slots__ = ("children", "locations") diff --git a/streamflow/data/remotepath.py b/streamflow/data/remotepath.py index 47e3f4622..79a7acef1 100644 --- a/streamflow/data/remotepath.py +++ b/streamflow/data/remotepath.py @@ -167,8 +167,8 @@ async def follow_symlink( if locations := context.data_manager.get_data_locations( path=path, deployment=connector.deployment_name, - location=location.name, - location_type=DataType.PRIMARY, + location_name=location.name, + data_type=DataType.PRIMARY, ): # If there is only one primary location on the site, return its path if len(locations) == 1: diff --git a/streamflow/deployment/__init__.py b/streamflow/deployment/__init__.py index 44ba6845e..270b901a5 100644 --- a/streamflow/deployment/__init__.py +++ b/streamflow/deployment/__init__.py @@ -1,3 +1,3 @@ -from streamflow.deployment.deployment_manager import DefaultDeploymentManager +from streamflow.deployment.manager import DefaultDeploymentManager deployment_manager_classes = {"default": DefaultDeploymentManager} diff --git a/streamflow/deployment/connector/base.py b/streamflow/deployment/connector/base.py index 76f4d64b0..68eae0044 100644 --- a/streamflow/deployment/connector/base.py +++ b/streamflow/deployment/connector/base.py @@ -6,21 +6,20 @@ import posixpath import shlex import tarfile -from abc import ABCMeta, abstractmethod +from abc import abstractmethod from typing import MutableSequence, TYPE_CHECKING from streamflow.core import utils from streamflow.core.data import StreamWrapperContext from streamflow.core.deployment import ( Connector, - ConnectorCopyKind, LOCAL_LOCATION, Location, ) from streamflow.core.exception import WorkflowExecutionException from streamflow.core.utils import get_local_to_remote_destination from streamflow.deployment import aiotarstream -from streamflow.deployment.future import FutureConnector +from streamflow.deployment.future import FutureAware from streamflow.deployment.stream import ( StreamReaderWrapper, StreamWriterWrapper, @@ -56,23 +55,10 @@ async def extract_tar_stream( await tar.extract(member, os.path.normpath(os.path.join(dst, member.path))) -class FutureMeta(ABCMeta): - def __instancecheck__(cls, instance): - if isinstance(instance, FutureConnector): - return super().__subclasscheck__(instance.type) - else: - return super().__instancecheck__(instance) - - -class FutureAware(metaclass=FutureMeta): - __slots__ = () - - class BaseConnector(Connector, FutureAware): def __init__(self, deployment_name: str, config_dir: str, transferBufferSize: int): super().__init__(deployment_name, config_dir) self.transferBufferSize: int = transferBufferSize - self.is_deployed: bool = False async def _copy_local_to_remote( self, @@ -249,85 +235,91 @@ def _get_stream_reader(self, location: Location, src: str) -> StreamWrapperConte ) ) - async def copy( + async def copy_local_to_remote( self, src: str, dst: str, locations: MutableSequence[Location], - kind: ConnectorCopyKind, - source_connector: Connector | None = None, - source_location: Location | None = None, read_only: bool = False, ) -> None: - if kind == ConnectorCopyKind.REMOTE_TO_REMOTE: - if source_location is None: - raise Exception( - "Source location is mandatory for remote to remote copy" - ) - if logger.isEnabledFor(logging.INFO): - if len(locations) > 1: - logger.info( - "COPYING {src} on location {src_loc} to {dst} on locations:\n\t{locations}".format( - src_loc=source_location, - src=src, - dst=dst, - locations="\n\t".join([str(loc) for loc in locations]), - ) - ) - else: - logger.info( - "COPYING {src} on location {src_loc} to {dst} on location {location}".format( - src_loc=source_location, - src=src, - dst=dst, - location=locations[0], - ) - ) - await self._copy_remote_to_remote( - src=src, - dst=dst, - locations=locations, - source_connector=source_connector, - source_location=source_location, - read_only=read_only, - ) - elif kind == ConnectorCopyKind.LOCAL_TO_REMOTE: - if logger.isEnabledFor(logging.INFO): - if len(locations) > 1: - logger.info( - "COPYING {src} on local file-system to {dst} on locations:\n\t{locations}".format( - src=src, - dst=dst, - locations="\n\t".join([str(loc) for loc in locations]), - ) + if logger.isEnabledFor(logging.INFO): + if len(locations) > 1: + logger.info( + "COPYING {src} on local file-system to {dst} on locations:\n\t{locations}".format( + src=src, + dst=dst, + locations="\n\t".join([str(loc) for loc in locations]), ) - else: - logger.info( - "COPYING {src} on local file-system to {dst} {location}".format( - src=src, - dst=dst, - location=( - "on local file-system" - if locations[0].name == LOCAL_LOCATION - else f"on location {locations[0]}" - ), - ) + ) + else: + logger.info( + "COPYING {src} on local file-system to {dst} {location}".format( + src=src, + dst=dst, + location=( + "on local file-system" + if locations[0].name == LOCAL_LOCATION + else f"on location {locations[0]}" + ), ) - await self._copy_local_to_remote( - src=src, dst=dst, locations=locations, read_only=read_only + ) + await self._copy_local_to_remote( + src=src, dst=dst, locations=locations, read_only=read_only + ) + + async def copy_remote_to_local( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + read_only: bool = False, + ) -> None: + if len(locations) > 1: + raise Exception("Copy from multiple locations is not supported") + if logger.isEnabledFor(logging.INFO): + logger.info( + f"COPYING {src} on location {locations[0]} to {dst} on local file-system" ) - elif kind == ConnectorCopyKind.REMOTE_TO_LOCAL: + await self._copy_remote_to_local( + src=src, dst=dst, location=locations[0], read_only=read_only + ) + + async def copy_remote_to_remote( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + source_location: Location, + source_connector: Connector | None = None, + read_only: bool = False, + ): + if logger.isEnabledFor(logging.INFO): if len(locations) > 1: - raise Exception("Copy from multiple locations is not supported") - if logger.isEnabledFor(logging.INFO): logger.info( - f"COPYING {src} on location {locations[0]} to {dst} on local file-system" + "COPYING {src} on location {src_loc} to {dst} on locations:\n\t{locations}".format( + src_loc=source_location, + src=src, + dst=dst, + locations="\n\t".join([str(loc) for loc in locations]), + ) ) - await self._copy_remote_to_local( - src=src, dst=dst, location=locations[0], read_only=read_only - ) - else: - raise NotImplementedError + else: + logger.info( + "COPYING {src} on location {src_loc} to {dst} on location {location}".format( + src_loc=source_location, + src=src, + dst=dst, + location=locations[0], + ) + ) + await self._copy_remote_to_remote( + src=src, + dst=dst, + locations=locations, + source_location=source_location, + source_connector=source_connector, + read_only=read_only, + ) async def run( self, diff --git a/streamflow/deployment/connector/schemas/flux.json b/streamflow/deployment/connector/schemas/flux.json index 0577c4397..9cec55ff1 100644 --- a/streamflow/deployment/connector/schemas/flux.json +++ b/streamflow/deployment/connector/schemas/flux.json @@ -10,7 +10,7 @@ "properties": { "beginTime": { "type": "string", - "description": "Convenience option for setting a ``begin-time`` dependency for a job. The job is guaranteed to start after the specified date and time" + "description": "Convenience option for setting a begin-time dependency for a job. The job is guaranteed to start after the specified date and time" }, "brokerOpts": { "type": "array", diff --git a/streamflow/deployment/connector/schemas/kubernetes.json b/streamflow/deployment/connector/schemas/kubernetes.json index 1b00e9fab..c594be08f 100644 --- a/streamflow/deployment/connector/schemas/kubernetes.json +++ b/streamflow/deployment/connector/schemas/kubernetes.json @@ -16,7 +16,7 @@ }, "inCluster": { "type": "boolean", - "description": "If true, the Helm connector will use a ServiceAccount to connect to the Kubernetes cluster. This is useful when StreamFlow runs directly inside a Kubernetes Pod", + "description": "If true, the Kubernetes connector will use a ServiceAccount to connect to the cluster. This is useful when StreamFlow runs directly inside a Kubernetes Pod", "default": false }, "kubeContext": { @@ -34,7 +34,7 @@ }, "namespace": { "type": "string", - "description": "Namespace to install the release into", + "description": "Namespace to deploy into", "default": "Current kube config namespace" }, "locationsCacheSize": { @@ -70,7 +70,7 @@ }, "wait": { "type": "boolean", - "description": "If set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment are in a ready state before marking the release as successful. It will wait for as long as timeout", + "description": "If set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment are in a ready state before marking the deployment as successful. It will wait for as long as timeout", "default": true } }, diff --git a/streamflow/deployment/connector/schemas/queue_manager.json b/streamflow/deployment/connector/schemas/queue_manager.json index 252ce09cb..9e0aeca2d 100644 --- a/streamflow/deployment/connector/schemas/queue_manager.json +++ b/streamflow/deployment/connector/schemas/queue_manager.json @@ -1,6 +1,7 @@ { "$schema": "https://json-schema.org/draft/2019-09/schema", "$id": "https://streamflow.di.unito.it/schemas/deployment/connector/queue_manager.json", + "$$target": "queue_manager.json", "type": "object", "properties": { "checkHostKey": { @@ -40,7 +41,7 @@ }, "maxConnections": { "type": "integer", - "description": "Maximum number of concurrent connection to open for a single SSH node", + "description": "(**Deprecated.** Use the `wraps` directive to wrap a standalone SSH connector.) Maximum number of concurrent connection to open for a single SSH node", "default": 1 }, "passwordFile": { @@ -62,7 +63,7 @@ }, "transferBufferSize": { "type": "integer", - "description": "Buffer size allocated for local and remote data transfers", + "description": "(**Deprecated.** Use the `wraps` directive to wrap a standalone SSH connector.) Buffer size allocated for local and remote data transfers", "default": "64kiB" }, "tunnel": { diff --git a/streamflow/deployment/connector/schemas/slurm.json b/streamflow/deployment/connector/schemas/slurm.json index 7715ccbce..e5a5a9a11 100644 --- a/streamflow/deployment/connector/schemas/slurm.json +++ b/streamflow/deployment/connector/schemas/slurm.json @@ -22,7 +22,7 @@ }, "batch": { "type": "string", - "description": "Nodes can have features assigned to them by the Slurm administrator. Users can specify which of these features are required by their batch script using this options. The ``batch`` argument must be a subset of the job's ``constraint`` argument" + "description": "Nodes can have features assigned to them by the Slurm administrator. Users can specify which of these features are required by their batch script using this options. The batch argument must be a subset of the job's constraint argument" }, "bb": { "type": "string", @@ -38,7 +38,7 @@ }, "clusterConstraint": { "type": "string", - "description": "Specifies features that a federated cluster must have to have a sibling job submitted to it. Slurm will attempt to submit a sibling job to a cluster if it has at least one of the specified features. If the ``!`` option is included, Slurm will attempt to submit a sibling job to a cluster that has none of the specified features" + "description": "Specifies features that a federated cluster must have to have a sibling job submitted to it. Slurm will attempt to submit a sibling job to a cluster if it has at least one of the specified features. If the ! option is included, Slurm will attempt to submit a sibling job to a cluster that has none of the specified features" }, "clusters": { "type": "string", @@ -74,7 +74,7 @@ }, "cpusPerGpu": { "type": "integer", - "description": "Advise Slurm that ensuing job steps will require ncpus processors per allocated GPU. Not compatible with the ``cpusPerTask`` option" + "description": "Advise Slurm that ensuing job steps will require ncpus processors per allocated GPU. Not compatible with the cpusPerTask option" }, "cpusPerTask": { "type": "integer", @@ -105,7 +105,7 @@ "type": "string" } ], - "description": "The job allocation can not share nodes with other running jobs (or just other users with the ``user`` option or with the ``mcs`` option). If ``user``/``mcs`` are not specified (i.e. the job allocation can not share nodes with other running jobs), the job is allocated all CPUs and GRES on all nodes in the allocation, but is only allocated as much memory as it requested" + "description": "The job allocation can not share nodes with other running jobs (or just other users with the user option or with the mcs option). If user/mcs are not specified (i.e. the job allocation can not share nodes with other running jobs), the job is allocated all CPUs and GRES on all nodes in the allocation, but is only allocated as much memory as it requested" }, "export": { "type": "string", @@ -120,7 +120,7 @@ "type": "string" } ], - "description": "If a number between 3 and ``OPEN_MAX`` is specified as the argument to this option, a readable file descriptor will be assumed (``STDIN`` and ``STDOUT`` are not supported as valid arguments). Otherwise a filename is assumed. Export environment variables defined in ``filename`` or read from ``fd`` to the job's execution environment" + "description": "If a number between 3 and OPEN_MAX is specified as the argument to this option, a readable file descriptor will be assumed (STDIN and STDOUT are not supported as valid arguments). Otherwise a filename is assumed. Export environment variables defined in filename or read from fd to the job's execution environment" }, "extraNodeInfo": { "type": "string", @@ -139,7 +139,7 @@ "type": "string" } ], - "description": "This option will tell sbatch to retrieve the login environment variables for the user specified in the ``uid`` option. Be aware that any environment variables already set in sbatch's environment will take precedence over any environment variables in the user's login environment. The optional timeout value is in seconds (default: 8)" + "description": "This option will tell sbatch to retrieve the login environment variables for the user specified in the uid option. Be aware that any environment variables already set in sbatch's environment will take precedence over any environment variables in the user's login environment. The optional timeout value is in seconds (default: 8)" }, "gid": { "anyOf": [ @@ -150,7 +150,7 @@ "type": "string" } ], - "description": "Submit the job with group's group access permissions. The ``gid`` option may be the group name or the numerical group ID" + "description": "Submit the job with group's group access permissions. The gid option may be the group name or the numerical group ID" }, "gpuBind": { "type": "string", @@ -162,19 +162,19 @@ }, "gpus": { "type": "string", - "description": "Specify the total number of GPUs required for the job. An optional GPU type specification can be supplied (e.g., ``volta:3``)" + "description": "Specify the total number of GPUs required for the job. An optional GPU type specification can be supplied (e.g., volta:3)" }, "gpusPerNode": { "type": "string", - "description": "Specify the number of GPUs required for the job on each node included in the job's resource allocation. An optional GPU type specification can be supplied (e.g., ``volta:3``)" + "description": "Specify the number of GPUs required for the job on each node included in the job's resource allocation. An optional GPU type specification can be supplied (e.g., volta:3)" }, "gpusPerSocket": { "type": "string", - "description": "Specify the number of GPUs required for the job on each socket included in the job's resource allocation. An optional GPU type specification can be supplied (e.g., ``volta:3``)" + "description": "Specify the number of GPUs required for the job on each socket included in the job's resource allocation. An optional GPU type specification can be supplied (e.g., volta:3)" }, "gpusPerTask": { "type": "string", - "description": "Specify the number of GPUs required for the job on each task to be spawned in the job's resource allocation. An optional GPU type specification can be supplied (e.g., ``volta:3``)" + "description": "Specify the number of GPUs required for the job on each task to be spawned in the job's resource allocation. An optional GPU type specification can be supplied (e.g., volta:3)" }, "gres": { "type": "string", @@ -186,11 +186,11 @@ }, "hint": { "type": "string", - "description": "Bind tasks according to application hints. This option cannot be used in conjunction with ``ntasksPerCore``, ``threadsPerCore``, or ``extraNodeInfo``" + "description": "Bind tasks according to application hints. This option cannot be used in conjunction with ntasksPerCore, threadsPerCore, or extraNodeInfo" }, "ignorePBS": { "type": "boolean", - "description": "Ignore all ``#PBS`` and ``#BSUB`` options specified in the batch script" + "description": "Ignore all #PBS and #BSUB options specified in the batch script" }, "jobName": { "type": "string", @@ -206,7 +206,7 @@ }, "mailUser": { "type": "string", - "description": "User to receive email notification of state changes as defined by ``mailType``. The default value is the submitting user" + "description": "User to receive email notification of state changes as defined by mailType. The default value is the submitting user" }, "mcsLabel": { "type": "string", @@ -250,7 +250,7 @@ }, "nodefile": { "type": "string", - "description": "Much like ``nodelist``, but the list is contained in a file of name node file" + "description": "Much like nodelist, but the list is contained in a file of name node file" }, "nodelist": { "type": "string", @@ -258,7 +258,7 @@ }, "nodes": { "type": "string", - "description": "Request that a minimum of minnodes nodes be allocated to this job. A maximum node count may also be specified with maxnodes. If only one number is specified, this is used as both the minimum and maximum node count. Node count can be also specified as ``size_string``. The ``size_string`` specification identifies what nodes values should be used" + "description": "Request that a minimum of minnodes nodes be allocated to this job. A maximum node count may also be specified with maxnodes. If only one number is specified, this is used as both the minimum and maximum node count. Node count can be also specified as size_string. The size_string specification identifies what nodes values should be used" }, "ntasks": { "type": "integer", @@ -302,7 +302,7 @@ }, "prefer": { "type": "string", - "description": "Nodes can have features assigned to them by the Slurm administrator. Users can specify which of these features are desired but not required by their job using the prefer option. This option operates independently from ``constraint`` and will override whatever is set there if possible" + "description": "Nodes can have features assigned to them by the Slurm administrator. Users can specify which of these features are desired but not required by their job using the prefer option. This option operates independently from constraint and will override whatever is set there if possible" }, "priority": { "type": "string", @@ -310,7 +310,7 @@ }, "profile": { "type": "string", - "description": "Enables detailed data collection by the ``acct_gather_profile`` plugin" + "description": "Enables detailed data collection by the acct_gather_profile plugin" }, "propagate": { "type": "string", @@ -334,7 +334,7 @@ }, "signal": { "type": "string", - "description": "When a job is within ``sig_time`` seconds of its end time, send it the signal ``sig_num``. Due to the resolution of event handling by Slurm, the signal may be sent up to 60 seconds earlier than specified" + "description": "When a job is within sig_time seconds of its end time, send it the signal sig_num. Due to the resolution of event handling by Slurm, the signal may be sent up to 60 seconds earlier than specified" }, "socketsPerNode": { "type": "integer", @@ -358,7 +358,7 @@ }, "timeMin": { "type": "string", - "description": "Set a minimum time limit on the job allocation. If specified, the job may have its ``time`` limit lowered to a value no lower than ``timeMin`` if doing so permits the job to begin execution earlier than otherwise possible" + "description": "Set a minimum time limit on the job allocation. If specified, the job may have its time limit lowered to a value no lower than timeMin if doing so permits the job to begin execution earlier than otherwise possible" }, "tmp": { "type": "integer", @@ -377,7 +377,7 @@ "type": "string" } ], - "description": "Attempt to submit and/or run a job as ``user`` instead of the invoking user id. ``user`` may be the user name or numerical user ID" + "description": "Attempt to submit and/or run a job as user instead of the invoking user id. user may be the user name or numerical user ID" }, "useMinNodes": { "type": "boolean", diff --git a/streamflow/deployment/future.py b/streamflow/deployment/future.py index 3bef4027d..81f392677 100644 --- a/streamflow/deployment/future.py +++ b/streamflow/deployment/future.py @@ -2,9 +2,10 @@ import asyncio import logging +from abc import ABCMeta from typing import Any, MutableMapping, MutableSequence -from streamflow.core.deployment import Connector, ConnectorCopyKind, Location +from streamflow.core.deployment import Connector, Location from streamflow.core.scheduling import AvailableLocation from streamflow.log_handler import logger @@ -26,14 +27,53 @@ def __init__( self.deploy_event: asyncio.Event = asyncio.Event() self.connector: Connector | None = None - async def copy( + async def copy_local_to_remote( self, src: str, dst: str, locations: MutableSequence[Location], - kind: ConnectorCopyKind, + read_only: bool = False, + ) -> None: + if self.connector is None: + if not self.deploying: + self.deploying = True + await self.deploy(self.external) + else: + await self.deploy_event.wait() + await self.connector.copy_local_to_remote( + src=src, + dst=dst, + locations=locations, + read_only=read_only, + ) + + async def copy_remote_to_local( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + read_only: bool = False, + ) -> None: + if self.connector is None: + if not self.deploying: + self.deploying = True + await self.deploy(self.external) + else: + await self.deploy_event.wait() + await self.connector.copy_remote_to_local( + src=src, + dst=dst, + locations=locations, + read_only=read_only, + ) + + async def copy_remote_to_remote( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + source_location: Location, source_connector: Connector | None = None, - source_location: str | None = None, read_only: bool = False, ) -> None: if self.connector is None: @@ -44,13 +84,12 @@ async def copy( await self.deploy_event.wait() if isinstance(source_connector, FutureConnector): source_connector = source_connector.connector - await self.connector.copy( + await self.connector.copy_remote_to_remote( src=src, dst=dst, locations=locations, - kind=kind, - source_connector=source_connector, source_location=source_location, + source_connector=source_connector, read_only=read_only, ) @@ -125,3 +164,15 @@ async def run( async def undeploy(self, external: bool) -> None: if self.connector is not None: await self.connector.undeploy(external) + + +class FutureMeta(ABCMeta): + def __instancecheck__(cls, instance): + if isinstance(instance, FutureConnector): + return super().__subclasscheck__(instance.type) + else: + return super().__instancecheck__(instance) + + +class FutureAware(metaclass=FutureMeta): + __slots__ = () diff --git a/streamflow/deployment/deployment_manager.py b/streamflow/deployment/manager.py similarity index 95% rename from streamflow/deployment/deployment_manager.py rename to streamflow/deployment/manager.py index 177ba038c..ae940410a 100644 --- a/streamflow/deployment/deployment_manager.py +++ b/streamflow/deployment/manager.py @@ -3,7 +3,6 @@ import asyncio import logging import os -from asyncio import Event from typing import TYPE_CHECKING import pkg_resources @@ -41,7 +40,7 @@ async def _deploy( while True: if deployment_name not in self.config_map: self.config_map[deployment_name] = deployment_config - self.events_map[deployment_name] = Event() + self.events_map[deployment_name] = asyncio.Event() self.dependency_graph[deployment_name] = set() connector_type = connector_classes[deployment_config.type] deployment_config = await self._inner_deploy( @@ -102,7 +101,7 @@ async def _inner_deploy( # If it has already been processed by the DeploymentManager if deployment_name in self.config_map: # If the DeploymentManager is creating the environment, wait for it to finish - if not self.is_deployed(deployment_name): + if deployment_name not in self.deployments_map: await self.events_map[deployment_name].wait() # Check for recursive definitions wrappers_stack.update(deployment_name) @@ -154,10 +153,10 @@ async def _inner_deploy( ) return deployment_config - async def close(self): + async def close(self) -> None: await self.undeploy_all() - async def deploy(self, deployment_config: DeploymentConfig): + async def deploy(self, deployment_config: DeploymentConfig) -> None: deployment_name = deployment_config.name await self._deploy(deployment_config, {deployment_name}) self.dependency_graph[deployment_name].add(deployment_name) @@ -171,10 +170,7 @@ def get_schema(cls) -> str: __name__, os.path.join("schemas", "deployment_manager.json") ) - def is_deployed(self, deployment_name: str): - return deployment_name in self.deployments_map - - async def undeploy(self, deployment_name: str): + async def undeploy(self, deployment_name: str) -> None: if deployment_name in dict(self.deployments_map): await self.events_map[deployment_name].wait() # Remove the deployment from the dependency graph @@ -204,7 +200,7 @@ async def undeploy(self, deployment_name: str): if len(deps) == 0: await self.undeploy(name) - async def undeploy_all(self): + async def undeploy_all(self) -> None: undeployments = [] for name in dict(self.deployments_map): undeployments.append(asyncio.create_task(self.undeploy(name))) diff --git a/streamflow/deployment/wrapper.py b/streamflow/deployment/wrapper.py index 0daa9d709..b5e6da240 100644 --- a/streamflow/deployment/wrapper.py +++ b/streamflow/deployment/wrapper.py @@ -2,32 +2,59 @@ from abc import ABC from typing import Any, MutableMapping, MutableSequence, Optional, Tuple, Union -from streamflow.core.deployment import Connector, ConnectorCopyKind, Location +from streamflow.core.deployment import Connector, Location from streamflow.core.scheduling import AvailableLocation +from streamflow.deployment.future import FutureAware -class ConnectorWrapper(Connector, ABC): +class ConnectorWrapper(Connector, FutureAware, ABC): def __init__(self, deployment_name: str, config_dir: str, connector: Connector): super().__init__(deployment_name, config_dir) self.connector: Connector = connector - async def copy( + async def copy_local_to_remote( self, src: str, dst: str, locations: MutableSequence[Location], - kind: ConnectorCopyKind, + read_only: bool = False, + ) -> None: + await self.connector.copy_local_to_remote( + src=src, + dst=dst, + locations=locations, + read_only=read_only, + ) + + async def copy_remote_to_local( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + read_only: bool = False, + ) -> None: + await self.connector.copy_remote_to_local( + src=src, + dst=dst, + locations=locations, + read_only=read_only, + ) + + async def copy_remote_to_remote( + self, + src: str, + dst: str, + locations: MutableSequence[Location], + source_location: Location, source_connector: Optional[Connector] = None, - source_location: Optional[Location] = None, read_only: bool = False, ) -> None: - await self.connector.copy( + await self.connector.copy_remote_to_remote( src=src, dst=dst, locations=locations, - kind=kind, - source_connector=source_connector, source_location=source_location, + source_connector=source_connector, read_only=read_only, ) diff --git a/streamflow/main.py b/streamflow/main.py index c1c214ca2..17899c6e7 100644 --- a/streamflow/main.py +++ b/streamflow/main.py @@ -44,7 +44,7 @@ async def _async_ext(args: argparse.Namespace): async def _async_list(args: argparse.Namespace): context = _get_context_from_config(args.file) try: - if workflows := await context.database.list_workflows(args.name): + if workflows := await context.database.get_workflows_list(args.name): max_sizes = { k: len(max(str(w[k]) for w in workflows)) + 1 for k in workflows[0].keys() diff --git a/streamflow/persistence/sqlite.py b/streamflow/persistence/sqlite.py index 5e0903a2b..28b4b6f67 100644 --- a/streamflow/persistence/sqlite.py +++ b/streamflow/persistence/sqlite.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import json import os from typing import Any, MutableMapping, MutableSequence @@ -126,7 +127,11 @@ async def add_deployment( return cursor.lastrowid async def add_port( - self, name: str, workflow_id: int, type: type[Port], params: str + self, + name: str, + workflow_id: int, + type: type[Port], + params: MutableMapping[str, Any], ) -> int: async with self.connection as db: async with db.execute( @@ -136,7 +141,7 @@ async def add_port( "name": name, "workflow": workflow_id, "type": utils.get_class_fullname(type), - "params": params, + "params": json.dumps(params), }, ) as cursor: return cursor.lastrowid @@ -158,7 +163,12 @@ async def add_provenance(self, inputs: MutableSequence[int], token: int): ) async def add_step( - self, name: str, workflow_id: int, status: int, type: type[Step], params: str + self, + name: str, + workflow_id: int, + status: int, + type: type[Step], + params: MutableMapping[str, Any], ) -> int: async with self.connection as db: async with db.execute( @@ -169,7 +179,7 @@ async def add_step( "workflow": workflow_id, "status": status, "type": utils.get_class_fullname(type), - "params": params, + "params": json.dumps(params), }, ) as cursor: return cursor.lastrowid @@ -178,7 +188,7 @@ async def add_target( self, deployment: int, type: type[Target], - params: str, + params: MutableMapping[str, Any], locations: int = 1, service: str | None = None, workdir: str | None = None, @@ -188,7 +198,7 @@ async def add_target( "INSERT INTO target(params, type, deployment, locations, service, workdir) " "VALUES (:params, :type, :deployment, :locations, :service, :workdir)", { - "params": params, + "params": json.dumps(params), "type": utils.get_class_fullname(type), "deployment": deployment, "locations": locations, @@ -214,12 +224,19 @@ async def add_token( ) as cursor: return cursor.lastrowid - async def add_workflow(self, name: str, params: str, status: int, type: str) -> int: + async def add_workflow( + self, name: str, params: MutableMapping[str, Any], status: int, type: str + ) -> int: async with self.connection as db: async with db.execute( "INSERT INTO workflow(name, params, status, type) " "VALUES(:name, :params, :status, :type)", - {"name": name, "params": params, "status": status, "type": type}, + { + "name": name, + "params": json.dumps(params), + "status": status, + "type": type, + }, ) as cursor: return cursor.lastrowid @@ -375,19 +392,6 @@ async def get_workflow(self, workflow_id: int) -> MutableMapping[str, Any]: ) as cursor: return await cursor.fetchone() - async def get_workflows_by_name( - self, workflow_name: str, last_only: bool = False - ) -> MutableSequence[MutableMapping[str, Any]]: - async with self.connection as db: - db.row_factory = aiosqlite.Row - async with db.execute( - "SELECT * FROM workflow WHERE name = :name ORDER BY id desc", - {"name": workflow_name}, - ) as cursor: - return ( - [await cursor.fetchone()] if last_only else await cursor.fetchall() - ) - async def get_workflow_ports( self, workflow_id: int ) -> MutableSequence[MutableMapping[str, Any]]: @@ -410,7 +414,20 @@ async def get_workflow_steps( ) as cursor: return await cursor.fetchall() - async def list_workflows( + async def get_workflows_by_name( + self, workflow_name: str, last_only: bool = False + ) -> MutableSequence[MutableMapping[str, Any]]: + async with self.connection as db: + db.row_factory = aiosqlite.Row + async with db.execute( + "SELECT * FROM workflow WHERE name = :name ORDER BY id desc", + {"name": workflow_name}, + ) as cursor: + return ( + [await cursor.fetchone()] if last_only else await cursor.fetchall() + ) + + async def get_workflows_list( self, name: str | None ) -> MutableSequence[MutableMapping[str, Any]]: async with self.connection as db: diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index b0ad3d7bd..fe0ec0524 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -11,7 +11,6 @@ import urllib.parse import uuid from abc import ABC, abstractmethod -from json import JSONDecodeError from typing import Any, MutableMapping, MutableSequence, cast from zipfile import ZipFile @@ -661,7 +660,7 @@ async def add_file(self, file: MutableMapping[str, str]) -> None: v = ESCAPED_COMMA.sub(",", v) try: self.graph[dst][k] = json.loads(v) - except JSONDecodeError: + except json.JSONDecodeError: self.graph[dst][k] = v @abstractmethod @@ -689,7 +688,7 @@ async def add_property(self, key: str, value: str): value = ESCAPED_COMMA.sub(",", value) try: current_obj[keys[-1]] = json.loads(value) - except JSONDecodeError: + except json.JSONDecodeError: current_obj[keys[-1]] = value async def create_archive( diff --git a/streamflow/recovery/checkpoint_manager.py b/streamflow/recovery/checkpoint_manager.py index 85785364d..1b364177e 100644 --- a/streamflow/recovery/checkpoint_manager.py +++ b/streamflow/recovery/checkpoint_manager.py @@ -33,7 +33,7 @@ async def _async_local_copy(self, data_location: DataLocation): parent_directory = os.path.join(self.checkpoint_dir, random_name()) local_path = os.path.join(parent_directory, data_location.relpath) await self.context.data_manager.transfer_data( - src_locations=[data_location], + src_location=data_location, src_path=data_location.path, dst_locations=[Location(deployment=LOCAL_LOCATION, name=LOCAL_LOCATION)], dst_path=local_path, diff --git a/streamflow/scheduling/policy/data_locality.py b/streamflow/scheduling/policy/data_locality.py index 2b11b2d9b..8c6a78afd 100644 --- a/streamflow/scheduling/policy/data_locality.py +++ b/streamflow/scheduling/policy/data_locality.py @@ -9,6 +9,7 @@ from streamflow.core.context import StreamFlowContext from streamflow.core.data import DataType from streamflow.core.deployment import Location +from streamflow.core.exception import WorkflowExecutionException from streamflow.core.scheduling import Hardware, JobAllocation, Policy from streamflow.workflow.token import FileToken @@ -23,13 +24,17 @@ async def get_location( self, context: StreamFlowContext, job: Job, - deployment: str, hardware_requirement: Hardware, available_locations: MutableMapping[str, AvailableLocation], jobs: MutableMapping[str, JobAllocation], locations: MutableMapping[str, MutableMapping[str, LocationAllocation]], ) -> Location | None: valid_locations = list(available_locations.keys()) + deployments = {loc.deployment for loc in available_locations.values()} + if len(deployments) > 1: + raise WorkflowExecutionException( + f"Available locations coming from multiple deployments: {deployments}" + ) # For each input token sorted by weight weights = { k: v @@ -55,8 +60,8 @@ async def get_location( loc.name for loc in context.data_manager.get_data_locations( path=path, - deployment=deployment, - location_type=DataType.PRIMARY, + deployment=next(iter(deployments)), + data_type=DataType.PRIMARY, ) ] ) diff --git a/streamflow/scheduling/scheduler.py b/streamflow/scheduling/scheduler.py index ac6527ff4..119d7b7f0 100644 --- a/streamflow/scheduling/scheduler.py +++ b/streamflow/scheduling/scheduler.py @@ -132,7 +132,6 @@ def _get_binding_filter(self, config: Config): async def _get_locations( self, job: Job, - deployment: str, hardware_requirement: Hardware, locations: int, scheduling_policy: Policy, @@ -143,7 +142,6 @@ async def _get_locations( selected_location = await scheduling_policy.get_location( context=self.context, job=job, - deployment=deployment, hardware_requirement=hardware_requirement, available_locations=available_locations, jobs=self.job_allocations, @@ -272,7 +270,6 @@ async def _process_target( ]: selected_locations = await self._get_locations( job=job_context.job, - deployment=target.deployment.name, hardware_requirement=hardware_requirement, locations=target.locations, scheduling_policy=self._get_policy( @@ -298,7 +295,6 @@ async def _process_target( else: selected_locations = await self._get_locations( job=job_context.job, - deployment=target.deployment.name, hardware_requirement=hardware_requirement, locations=target.locations, scheduling_policy=self._get_policy( diff --git a/tests/test_transfer.py b/tests/test_transfer.py index 5198b5e7c..74978b6ac 100644 --- a/tests/test_transfer.py +++ b/tests/test_transfer.py @@ -75,7 +75,7 @@ async def test_directory_to_directory( data_type=DataType.PRIMARY, ) await context.data_manager.transfer_data( - src_locations=[src_location], + src_location=src_location, src_path=src_path, dst_locations=[dst_location], dst_path=dst_path, @@ -130,7 +130,7 @@ async def test_file_to_directory( data_type=DataType.PRIMARY, ) await context.data_manager.transfer_data( - src_locations=[src_location], + src_location=src_location, src_path=src_path, dst_locations=[dst_location], dst_path=dst_path, @@ -182,7 +182,7 @@ async def test_file_to_file( data_type=DataType.PRIMARY, ) await context.data_manager.transfer_data( - src_locations=[src_location], + src_location=src_location, src_path=src_path, dst_locations=[dst_location], dst_path=dst_path,