Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Wrapper to Support Pegasus 5.0.0 #73

Merged
merged 14 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
<!--
[![Build status](https://ci.appveyor.com/api/projects/status/3jhdnwreqoni1492/branch/master?svg=true)](https://ci.appveyor.com/project/isi-vista/vista-pegasus-wrapper/branch/master)
-->
[![Build status](https://travis-ci.com/isi-vista/vista-pegasus-wrapper.svg?branch=master)](https://travis-ci.com/isi-vista/vista-pegasus-wrapper?branch=master)
[![codecov](https://codecov.io/gh/isi-vista/vista-pegasus-wrapper/branch/master/graph/badge.svg)](https://codecov.io/gh/isi-vista/vista-pegasus-wrapper)

Expand Down Expand Up @@ -28,25 +25,27 @@ This library simplifies the process of writing a profile which can be converted
Using [WorkflowBuilder from `workflow.py`](pegasus_wrapper/workflow.py) develop a function to generate a `Workflow.dax`.
See [example_workflow](pegasus_wrapper/scripts/example_workflow_builder.py) for an extremely simple workflow which we will use to demonstrate the process.
To see the example workflow add a `root.params` file to the parameters directory with the following:
*Note the Directory should be in your $Home and not a NFS like /nas/gaia/ as the submission will fail for an NFS reason*
```
example_root_dir: "path/to/output/dir/"
conda_environment: "pegasus-wrapper"
conda_base_path: "path/to/conda"
```
run `python -m pegasus_wrapper.scripts.example_workflow_builder parameters/root.params` from this project's root folder.

The log output will provide you the output location of the `Text.dax` Assuming you are logged into a submit node with an active Pegasus install:

```
cd "path/to/output/dir"
pegasus-plan --conf pegasus.conf --dax Test.dax --dir "path/to/output/dir" --relative-dir exampleRun-001
pegasus-run "path/to/output/dir/"exampleRun-001
./submit.sh
```
The example workflow submits **ONLY** to `scavenge`. In an actual workflow we would recommend parameterizing it.

Our current system places `ckpt` files to indicate that a job has finished in the event the DAX needs to be generated again to fix a bug after an issue was found. This system is non-comprehensive as it currently requires manual control. When submitting a new job using previous handles use a new relative dir in the plan and run.

A [Nuke Checkpoints](scripts/nuke_checkpoints.py) script is provided for ease of removing checkpoint files. To use, pass a directory location as the launch parameter and the script will remove checkpoint files from the directory and all sub-directories.

It is recommended to use a shared directory on the NAS, e.g. `/nas/gaia` to host a workflow under as compared to a users `/nas/user/xyz` home directory due to space limitations on the NAS.

# FAQ
## How can I exclude some nodes?

Expand All @@ -57,11 +56,6 @@ Use run_on_single_node parameter when you initialize a workflow (or a Slurm reso
* Note you cannot use this option with the **exclude_list** option.
* Note you cannot specify more than one node using this option.

## What are valid root directories for the workflow?

Currently the root directory should be be in your home directory and not on an NAS like `/nas/gaia/` as the submission will fail for an NFS reason.
The experiment directory can be (and ought to be) on such a drive, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any restrictions on valid root dirs? Should the README include a statement that root dirs can now be on shared /nas/gaia or /nas/user/xyz ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no longer any restrictions on valid root dirs. There used to be a NFS related bug when trying to mount a shared NAS as the root dir for Pegasus but it has been resolved.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielnapierski I've added a note to the readme that the root_dir can be set to any location on the NAS, and that a shared drive e.g. /nas/gaia may be preferred due to space limitations on the user home dirs. However if unspecified the script will default to a users /nas/home/xyz/ dir.

# Common Errors

## Mismatching partition selection and max walltime
Expand All @@ -72,7 +66,8 @@ Partitions each have a max walltime associated with them. See the saga cluster w

If you change code while a pipeline is runnning, the jobs will pick up the changes. This could be helpful if you notice an error and fix it before that code runs, but can also lead to some unexpected behavior.

## `No module named 'Pegasus'`
## `No module named 'Pegasus'` (Version 4.9.3)
*This is believed to have been fixed for Pegasus Version 5. If this arrises please leave an issue*

This is a weird one that pops up usually when first getting set up with Pegasus. First, if you see this please contact one of the maintainers (currently @joecummings or @spigo900). To fix this, install the following packages with these commands in this exact order - they are dependent on each other.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this fix. Comment you added is enough.

1. `pip install git+https://github.com/pegasus-isi/pegasus/#egg=pegasus-wms.common&subdirectory=packages/pegasus-common`
Expand All @@ -85,6 +80,10 @@ This is a weird one that pops up usually when first getting set up with Pegasus.

A new node gotten with `srun` does not load the Spack modules you usually have set up in your runtime scripts. You need to manually install these if you want to work with Tensorflow or anything requiring Cuda.

# Updating from wrapper script to use Pegasus5.0.0 from Pegasus4.9.3

No changes should be needed for any project using the previous version of the wrapper which supported Pegasus4.9.3.

# Contributing

Run `make precommit` before commiting.
Expand Down
2 changes: 1 addition & 1 deletion pegasus_wrapper/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pegasus_wrapper.locator import Locator

from more_itertools import collapse
from Pegasus.DAX3 import File, Job
from Pegasus.api import File, Job
from typing_extensions import Protocol


Expand Down
158 changes: 73 additions & 85 deletions pegasus_wrapper/pegasus_utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
from pathlib import Path
from typing import Optional

from Pegasus.api import Arch
from Pegasus.DAX3 import OS, PFN, Executable, File
from vistautils.parameters import Parameters

from Pegasus.api import (
OS,
Arch,
Directory,
FileServer,
Operation,
Properties,
Site,
SiteCatalog,
)

SUBMIT_SCRIPT = """#!/bin/bash

set -e

pegasus-plan \\
--conf pegasus.conf \\
--dax {dax_file} \\
{dax_file} \\
--conf pegasus.properties \\
--dir {workflow_directory} \\
--cleanup leaf \\
--force \\
Expand All @@ -20,89 +29,68 @@
"""


def path_to_pegasus_file(
path: Path,
*,
site: str = "local",
name: Optional[str] = None,
is_raw_input: bool = False
) -> File:
"""
Given a *path* object return a pegasus `File` for usage in a workflow
If the resource is not on a local machine provide the *site* string.
Files can be used for either an input or output of a Job.

Args:
path: path to the file
site: site to be used, default is local. Should be set to saga if running
on cluster.
name: name given to the file
is_raw_input: indicates that the file doesn't come from the output of another
job in the workflow, so can be safely added to the Pegasus DAX
Returns:
Pegasus File at the given path

"""
rtnr = File(name if name else str(path.absolute()).replace("/", "-"))
if is_raw_input:
rtnr.addPFN(path_to_pfn(path, site=site))
return rtnr


def path_to_pfn(path: Path, *, site: str = "local") -> PFN:
return PFN(str(path.absolute()), site=site)


def script_to_pegasus_executable(
path: Path,
name: Optional[str] = None,
*,
site: str = "local",
namespace: Optional[str] = None,
version: Optional[str] = None,
arch: Optional[Arch] = None,
os: Optional[OS] = None,
osrelease: Optional[str] = None,
osversion: Optional[str] = None,
glibc: Optional[str] = None,
installed: Optional[bool] = None,
container: Optional[str] = None
) -> Executable:
"""
Turns a script path into a pegasus Executable

Arguments:
*name*: Logical name of executable
*namespace*: Executable namespace
*version*: Executable version
*arch*: Architecture that this exe was compiled for
*os*: Name of os that this exe was compiled for
*osrelease*: Release of os that this exe was compiled for
*osversion*: Version of os that this exe was compiled for
*glibc*: Version of glibc this exe was compiled against
*installed*: Is the executable installed (true), or stageable (false)
*container*: Optional attribute to specify the container to use
"""

rtrnr = Executable(
path.stem + path.suffix if name is None else name,
namespace=namespace,
version=version,
arch=arch,
os=os,
osrelease=osrelease,
osversion=osversion,
glibc=glibc,
installed=installed,
container=container,
)
rtrnr.addPFN(path_to_pfn(path, site=site))
return rtrnr


def build_submit_script(path: Path, dax_file: str, workflow_directory: Path) -> None:
path.write_text(
SUBMIT_SCRIPT.format(workflow_directory=workflow_directory, dax_file=dax_file)
)
# Designate the submit script as executable
path.chmod(0o777)


def add_local_nas_to_sites(
sites_catalog: SiteCatalog, params: Parameters = Parameters.empty()
) -> None:
home = params.string("home_dir", default=str(Path.home().absolute()))
shared_scratch_dir = params.string(
"local_shared_scratch", default=f"{home}/workflows/scratch"
)
local_storage_dir = params.string("local_storage", default=f"{home}/workflows/output")

sites_catalog.add_sites(
Site("local", arch=Arch.X86_64, os_type=OS.LINUX).add_directories(
Directory(Directory.SHARED_SCRATCH, shared_scratch_dir).add_file_servers(
FileServer("file://" + shared_scratch_dir, Operation.ALL)
),
Directory(Directory.LOCAL_STORAGE, local_storage_dir).add_file_servers(
FileServer("file://" + local_storage_dir, Operation.ALL)
),
)
)


def add_saga_cluster_to_sites(
sites_catalog: SiteCatalog, params: Parameters = Parameters.empty()
) -> None:
home = params.string("home_dir", default=str(Path.home().absolute()))

shared_scratch_dir = params.string(
"saga_shared_scratch", default=f"{home}/workflows/shared-scratch"
)

saga = Site("saga", arch=Arch.X86_64, os_type=OS.LINUX)
saga.add_directories(
Directory(Directory.SHARED_SCRATCH, shared_scratch_dir).add_file_servers(
FileServer("file://" + shared_scratch_dir, Operation.ALL)
)
)

saga.add_env(
key="PEGASUS_HOME", value="/nas/gaia/shared/cluster/pegasus5/pegasus-5.0.0"
)

# Profiles
saga.add_pegasus_profile(style="glite", auxillary_local=True)
saga.add_condor_profile(grid_resource="batch slurm")

sites_catalog.add_sites(saga)


def configure_saga_properities( # pylint: disable=unused-argument
properties: Properties, params: Parameters = Parameters.empty()
) -> None:
properties["pegasus.data.configuration"] = "sharedfs"
properties["pegasus.metrics.app"] = "SAGA"
properties["dagman.retry"] = "0"

# TODO: Implement a method to add parameters to this properties file
# See: https://github.com/isi-vista/vista-pegasus-wrapper/issues/72
40 changes: 19 additions & 21 deletions pegasus_wrapper/resource_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from vistautils.parameters import Parameters
from vistautils.range import Range

from Pegasus.DAX3 import Job, Namespace, Profile
from Pegasus.api import Job
from saga_tools.slurm import to_slurm_memory_string
from typing_extensions import Protocol

Expand Down Expand Up @@ -159,7 +159,7 @@ def from_parameters(params: Parameters) -> ResourceRequest:

def unify(self, other: ResourceRequest) -> ResourceRequest:
if isinstance(other, SlurmResourceRequest):
partition = other.partition or self.partition
partition = other.partition
else:
partition = self.partition

Expand All @@ -186,19 +186,11 @@ def apply_to_job(self, job: Job, *, job_name: str) -> None:
f"Partition '{self.partition.name}' has a max walltime of {self.partition.max_walltime} mins, which is less than the time given ({self.job_time_in_minutes} mins) for job: {job_name}."
)

qos_or_account = (
f"qos {self.partition.name}"
if self.partition.name in (SCAVENGE, EPHEMERAL)
else f"account {self.partition.name}"
)
slurm_resource_content = SLURM_RESOURCE_STRING.format(
qos_or_account=qos_or_account,
partition=self.partition.name,
num_cpus=self.num_cpus or 1,
num_gpus=self.num_gpus if self.num_gpus is not None else 0,
job_name=job_name,
mem_str=to_slurm_memory_string(self.memory or _SLURM_DEFAULT_MEMORY),
time=self.convert_time_to_slurm_format(self.job_time_in_minutes),
)

if (
Expand All @@ -216,18 +208,24 @@ def apply_to_job(self, job: Job, *, job_name: str) -> None:
if self.run_on_single_node:
slurm_resource_content += f" --nodelist={self.run_on_single_node}"

logging.debug(
"Slurm Resource Request for %s: %s", job_name, slurm_resource_content
)
job.addProfile(
Profile(Namespace.PEGASUS, "glite.arguments", slurm_resource_content)
if self.partition.name in (SCAVENGE, EPHEMERAL):
slurm_resource_content += f" --qos={self.partition.name}"

job.add_pegasus_profile(
runtime=self.job_time_in_minutes,
queue=str(self.partition.name),
project=None
if self.partition.name in (EPHEMERAL, SCAVENGE)
else self.partition.name,
glite_arguments=slurm_resource_content,
)
category_profile = Profile(Namespace.DAGMAN, "category", self.partition)
if not job.hasProfile(category_profile):
job.addProfile(category_profile)

if (
"dagman" not in job.profiles.keys()
or "CATEGORY" not in job.profiles["dagman"].keys()
):
job.add_dagman_profile(category=str(self.partition))


SLURM_RESOURCE_STRING = """--{qos_or_account} --partition {partition} --ntasks 1
--cpus-per-task {num_cpus} --gpus-per-task {num_gpus} --job-name {job_name} --mem {mem_str}
--time {time}"""
SLURM_RESOURCE_STRING = """--ntasks=1 --cpus-per-task={num_cpus} --gpus-per-task={num_gpus} --job-name={job_name} --mem={mem_str}"""
_BACKEND_PARAM = "backend"
Empty file.
20 changes: 0 additions & 20 deletions pegasus_wrapper/resources/pegasus.conf

This file was deleted.

Loading