-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support more methods of WDL task disk specification #5001
Changes from 54 commits
3971be6
48990d0
e8d223c
fbe0eef
8f7199a
166bf41
d7719b9
0c131e0
b4714ec
4443728
8da0d7d
9379715
85a4df9
6aa73b1
cc36f71
36fd277
c7bec56
4b4e7f0
6bf2a4e
31b7e27
98bba55
25e0e51
42fedf6
d35d033
9d75e4b
f752535
4ce33d5
e9df2f9
01b8102
2955c4d
a364601
02d873f
b65b315
6f30676
58196ce
66d3e50
ea19cb6
32c65dd
63b4410
a1a8651
29ffd3f
7068810
c090823
14e2ee1
901c4c2
aa58e2f
8b15af6
e04f5c1
ae2f169
eb56ef9
a21fc3a
1a098b4
ceccb07
839e09b
89ca0d4
bf3ca2a
4130ab8
d194edc
f2080a8
f0dbe3f
e6a9082
6fbef8c
68fb254
8d093dd
b184386
67b2554
7f4b452
07d6b31
f3a3a2f
9ab5c9c
0f247a6
067ee8d
a7a1459
c7ac131
806ff9c
acaa894
accf831
a0b65f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,7 @@ | |
from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException, | ||
InvalidImportExportUrlException, LocatorException) | ||
from toil.lib.accelerators import get_individual_local_accelerators | ||
from toil.lib.conversions import convert_units, human2bytes | ||
from toil.lib.conversions import convert_units, human2bytes, VALID_PREFIXES | ||
from toil.lib.io import mkdtemp | ||
from toil.lib.memoize import memoize | ||
from toil.lib.misc import get_user_name | ||
|
@@ -90,6 +90,11 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
class InsufficientMountDiskSpace(Exception): | ||
def __init__(self, mount_targets: List[str], desired_bytes: int, available_bytes: int) -> None: | ||
super().__init__("Not enough available disk space for the target mount points %s. Needed %d bytes but there is only %d available." | ||
% (", ".join(mount_targets), desired_bytes, available_bytes)) | ||
|
||
@contextmanager | ||
def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]: | ||
""" | ||
|
@@ -109,7 +114,8 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] | |
LocatorException, | ||
InvalidImportExportUrlException, | ||
UnimplementedURLException, | ||
JobTooBigError | ||
JobTooBigError, | ||
InsufficientMountDiskSpace | ||
) as e: | ||
# Don't expose tracebacks to the user for exceptions that may be expected | ||
log("Could not " + task + " because:") | ||
|
@@ -1915,43 +1921,87 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: | |
memory_spec = human2bytes(memory_spec) | ||
runtime_memory = memory_spec | ||
|
||
mount_spec: Dict[Optional[str], int] = dict() | ||
if runtime_bindings.has_binding('disks'): | ||
# Miniwdl doesn't have this, but we need to be able to parse things like: | ||
# local-disk 5 SSD | ||
# which would mean we need 5 GB space. Cromwell docs for this are at https://cromwell.readthedocs.io/en/stable/RuntimeAttributes/#disks | ||
# We ignore all disk types, and complain if the mount point is not `local-disk`. | ||
disks_spec: str = runtime_bindings.resolve('disks').value | ||
all_specs = disks_spec.split(',') | ||
# Sum up the gigabytes in each disk specification | ||
total_gb = 0 | ||
disks_spec: Union[List[WDL.Value.String], str] = runtime_bindings.resolve('disks').value | ||
if isinstance(disks_spec, list): | ||
# SPEC says to use the first one | ||
# the parser gives an array of WDL string objects | ||
all_specs = [part.value for part in disks_spec] | ||
else: | ||
all_specs = disks_spec.split(',') | ||
# Sum up the space in each disk specification | ||
total_bytes: float = 0 | ||
for spec in all_specs: | ||
# Split up each spec as space-separated. We assume no fields | ||
# are empty, and we want to allow people to use spaces after | ||
# their commas when separating the list, like in Cromwell's | ||
# examples, so we strip whitespace. | ||
spec_parts = spec.strip().split(' ') | ||
if len(spec_parts) != 3: | ||
# TODO: Add a WDL line to this error | ||
raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not have 3 space-separated parts") | ||
if spec_parts[0] != 'local-disk': | ||
# TODO: Add a WDL line to this error | ||
raise NotImplementedError(f"Could not provide disks = {disks_spec} because only the local-disks mount point is implemented") | ||
try: | ||
total_gb += int(spec_parts[1]) | ||
except: | ||
# TODO: Add a WDL line to this error | ||
raise ValueError(f"Could not parse disks = {disks_spec} because {spec_parts[1]} is not an integer") | ||
# TODO: we always ignore the disk type and assume we have the right one. | ||
# TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I | ||
# can't imagine that ever being standardized; just leave it | ||
# alone so that the workflow doesn't rely on this weird and | ||
# likely-to-change Cromwell detail. | ||
if spec_parts[2] == 'LOCAL': | ||
logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') | ||
total_bytes: float = convert_units(total_gb, 'GB') | ||
runtime_disk = int(total_bytes) | ||
|
||
# First check that this is a format we support. Both the WDL spec and Cromwell allow a max 3-piece specification | ||
# So if there are more than 3 pieces, raise an error | ||
if len(spec_parts) > 3: | ||
raise RuntimeError(f"Could not parse disks = {disks_spec} because {all_specs} contains more than 3 parts") | ||
part_size = None | ||
# default to GiB as per spec | ||
part_suffix: str = "GiB" # The WDL spec's default is 1 GiB | ||
# default to the execution directory | ||
specified_mount_point = None | ||
# first get the size, since units should always be some nonnumerical string, get the last numerical value | ||
for i, part in enumerate(spec_parts): | ||
if part.replace(".", "", 1).isdigit(): | ||
part_size = int(float(part)) | ||
spec_parts.pop(i) | ||
break | ||
# unit specification is only allowed to be at the end | ||
if spec_parts[-1].lower() in VALID_PREFIXES: | ||
part_suffix = spec_parts[-1] | ||
spec_parts.pop(-1) | ||
# The last remaining element, if it exists, is the mount point | ||
if len(spec_parts) > 0: | ||
specified_mount_point = spec_parts[0] | ||
|
||
if part_size is None: | ||
# Disk spec did not include a size | ||
raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size") | ||
|
||
|
||
if part_suffix == "LOCAL": | ||
# TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I | ||
# can't imagine that ever being standardized; just leave it | ||
# alone so that the workflow doesn't rely on this weird and | ||
# likely-to-change Cromwell detail. | ||
logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') | ||
elif part_suffix in ("HDD", "SSD"): | ||
# For cromwell compatibility, assume this means GB in units | ||
# We don't actually differentiate between HDD and SSD | ||
part_suffix = "GB" | ||
|
||
per_part_size = convert_units(part_size, part_suffix) | ||
total_bytes += per_part_size | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The total space needed (including any |
||
if mount_spec.get(specified_mount_point) is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check doesn't account for how If there was a test for this validation logic, that would catch the problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mapped |
||
if specified_mount_point is not None: | ||
# raise an error as all mount points must be unique | ||
raise ValueError(f"Could not parse disks = {disks_spec} because the mount point {specified_mount_point} is specified multiple times") | ||
else: | ||
if mount_spec.get(specified_mount_point) is not None: | ||
stxue1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise ValueError(f"Could not parse disks = {disks_spec} because the mount point is omitted more than once") | ||
|
||
# TODO: we always ignore the disk type and assume we have the right one. | ||
if specified_mount_point != "local-disk": | ||
# Don't mount local-disk. This isn't in the spec, but is carried over from cromwell | ||
# When the mount point is omitted, default to the task's execution directory, which None will represent | ||
mount_spec[specified_mount_point] = int(per_part_size) | ||
else: | ||
# local-disk is equivalent to an omitted mount point | ||
mount_spec[None] = int(per_part_size) | ||
runtime_disk = int(total_bytes) | ||
|
||
if not runtime_bindings.has_binding("gpu") and self._task.effective_wdl_version in ('1.0', 'draft-2'): | ||
# For old WDL versions, guess whether the task wants GPUs if not specified. | ||
use_gpus = (runtime_bindings.has_binding('gpuCount') or | ||
|
@@ -1985,7 +2035,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: | |
runtime_accelerators = [accelerator_requirement] | ||
|
||
# Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. | ||
run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) | ||
run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, | ||
self._task_path, mount_spec, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, | ||
accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) | ||
# Run that as a child | ||
self.addChild(run_job) | ||
|
||
|
@@ -2008,7 +2060,8 @@ class WDLTaskJob(WDLBaseJob): | |
All bindings are in terms of task-internal names. | ||
""" | ||
|
||
def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, **kwargs: Any) -> None: | ||
def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, | ||
task_path: str, mount_spec: Dict[Optional[str], int], **kwargs: Any) -> None: | ||
""" | ||
Make a new job to run a task. | ||
|
||
|
@@ -2032,6 +2085,7 @@ def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBind | |
self._task_id = task_id | ||
self._namespace = namespace | ||
self._task_path = task_path | ||
self._mount_spec = mount_spec | ||
|
||
### | ||
# Runtime code injection system | ||
|
@@ -2211,6 +2265,58 @@ def can_mount_proc(self) -> bool: | |
""" | ||
return "KUBERNETES_SERVICE_HOST" not in os.environ | ||
|
||
def ensure_mount_point(self, file_store: AbstractFileStore, mount_spec: Dict[Optional[str], int]) -> Dict[str, str]: | ||
""" | ||
Ensure the mount point sources are available. | ||
|
||
Will check if the mount point source has the requested amount of space available. | ||
|
||
Note: We are depending on Toil's job scheduling backend to error when the sum of multiple mount points disk requests is greater than the total available | ||
For example, if a task has two mount points request 100 GB each but there is only 100 GB available, the df check may pass | ||
but Toil should fail to schedule the jobs internally | ||
|
||
:param mount_spec: Mount specification from the disks attribute in the WDL task. Is a dict where key is the mount point target and value is the size | ||
:param file_store: File store to create a tmp directory for the mount point source | ||
:return: Dict mapping mount point target to mount point source | ||
""" | ||
logger.debug("Detected mount specifications, creating mount points.") | ||
mount_src_mapping = {} | ||
# Create one tmpdir to encapsulate all mount point sources, each mount point will be associated with a subdirectory | ||
tmpdir = file_store.getLocalTempDir() | ||
|
||
# The POSIX standard doesn't specify how to escape spaces in mount points and file system names | ||
# The only defect of this regex is if the target mount point is the same format as the df output | ||
# It is likely reliable enough to trust the user has not created a mount with a df output-like name | ||
regex_df = re.compile(r".+ \d+ +\d+ +(\d+) +\d+% +.+") | ||
total_mount_size = sum(mount_spec.values()) | ||
try: | ||
# Use arguments from the df POSIX standard | ||
df_line = subprocess.check_output(["df", "-k", "-P", tmpdir], encoding="utf-8").split("\n")[1] | ||
m = re.match(regex_df, df_line) | ||
if m is None: | ||
logger.debug("Output of df may be malformed: %s", df_line) | ||
logger.warning("Unable to check disk requirements as output of 'df' command is malformed. Will assume storage is always available.") | ||
else: | ||
# Block size will always be 1024 | ||
available_space = int(m[1]) * 1024 | ||
if available_space < total_mount_size: | ||
# We do not have enough space available for this mount point | ||
# An omitted mount point is the task's execution directory so show that to the user instead | ||
raise InsufficientMountDiskSpace([mount_point if mount_point is not None else "/mnt/miniwdl_task_container/work" for mount_point in mount_spec.keys()], | ||
total_mount_size, available_space) | ||
except subprocess.CalledProcessError as e: | ||
# If df somehow isn't available | ||
logger.debug("Unable to call df. stdout: %s stderr: %s", e.stdout, e.stderr) | ||
logger.warning("Unable to check disk requirements as call to 'df' command failed. Will assume storage is always available.") | ||
for mount_target in mount_spec.keys(): | ||
# Create a new subdirectory for each mount point | ||
source_location = os.path.join(tmpdir, str(uuid.uuid4())) | ||
os.mkdir(source_location) | ||
if mount_target is not None: | ||
# None represents an omitted mount point, which will default to the task's work directory. MiniWDL's internals will mount the task's work directory by itself | ||
mount_src_mapping[mount_target] = source_location | ||
Comment on lines
+2511
to
+2515
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes a directory for omitted mount points even if it never uses them. We also sort of assume Docker/Singularity is pulling from the same storage we are for |
||
return mount_src_mapping | ||
|
||
@report_wdl_errors("run task command", exit=True) | ||
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: | ||
""" | ||
|
@@ -2224,6 +2330,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: | |
# We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them | ||
standard_library = ToilWDLStdLibBase(file_store, self._task_path, enforce_existence=False) | ||
|
||
# Create mount points and get a mapping of target mount points to locations on disk | ||
mount_mapping = self.ensure_mount_point(file_store, self._mount_spec) | ||
|
||
# Get the bindings from after the input section | ||
bindings = unwrap(self._task_internal_bindings) | ||
# And the bindings from evaluating the runtime section | ||
|
@@ -2385,8 +2494,51 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]: | |
return command_line | ||
|
||
# Apply the patch | ||
task_container._run_invocation = patched_run_invocation # type: ignore | ||
task_container._run_invocation = patched_run_invocation # type: ignore | ||
|
||
singularity_original_prepare_mounts = task_container.prepare_mounts | ||
|
||
def patch_prepare_mounts_singularity() -> List[Tuple[str, str, bool]]: | ||
""" | ||
Mount the mount points specified from the disk requirements. | ||
|
||
The singularity and docker patch are separate as they have different function signatures | ||
""" | ||
# todo: support AWS EBS/Kubernetes persistent volumes | ||
# this logic likely only works for local clusters as we don't deal with the size of each mount point | ||
mounts: List[Tuple[str, str, bool]] = singularity_original_prepare_mounts() | ||
# todo: support AWS EBS/Kubernetes persistent volumes | ||
# this logic likely only works for local clusters as we don't deal with the size of each mount point | ||
for mount_point, source_location in mount_mapping.items(): | ||
mounts.append((mount_point, source_location, True)) | ||
return mounts | ||
task_container.prepare_mounts = patch_prepare_mounts_singularity # type: ignore[method-assign] | ||
elif isinstance(task_container, SwarmContainer): | ||
docker_original_prepare_mounts = task_container.prepare_mounts | ||
|
||
try: | ||
# miniwdl depends on docker so this should be available but check just in case | ||
import docker | ||
# docker stubs are still WIP: https://github.com/docker/docker-py/issues/2796 | ||
from docker.types import Mount # type: ignore[import-untyped] | ||
|
||
def patch_prepare_mounts_docker(logger: logging.Logger) -> List[Mount]: | ||
""" | ||
Same as the singularity patch but for docker | ||
""" | ||
mounts: List[Mount] = docker_original_prepare_mounts(logger) | ||
for mount_point, source_location in mount_mapping.items(): | ||
mounts.append( | ||
Mount( | ||
mount_point.rstrip("/").replace("{{", '{{"{{"}}'), | ||
source_location.rstrip("/").replace("{{", '{{"{{"}}'), | ||
type="bind", | ||
) | ||
) | ||
return mounts | ||
task_container.prepare_mounts = patch_prepare_mounts_docker # type: ignore[method-assign] | ||
except ImportError: | ||
logger.warning("Docker package not installed. Unable to add mount points.") | ||
# Show the runtime info to the container | ||
task_container.process_runtime(miniwdl_logger, {binding.name: binding.value for binding in devirtualize_files(runtime_bindings, standard_library)}) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also probably switch the default unit to
GB
here, since that is what the Cromwell syntax expects.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to keep the default unit to
GiB
as that is the WDL spec default https://github.com/openwdl/wdl/blob/e43e042104b728df1f1ad6e6145945d2b32331a6/SPEC.md?plain=1#L5082