Skip to content

Commit

Permalink
⚙️ add 'unbuffer' as a submit configuration to sge
Browse files Browse the repository at this point in the history
  • Loading branch information
juanesarango committed Oct 18, 2024
1 parent 24eb26f commit 379a588
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions isabl_cli/batch_systems/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from os.path import join
import os
import random
import shutil
import subprocess

from slugify import slugify
Expand Down Expand Up @@ -83,6 +84,7 @@ def submit_sge(app, command_tuples): # pragma: no cover
requirements=requirements or "",
extra_args=submit_configuration.get("extra_args", ""),
throttle_by=submit_configuration.get("throttle_by", 50),
unbuffer=submit_configuration.get("unbuffer", False),
jobname=(
f"application: {app} | "
f"methods: {', '.join(methods)} | "
Expand All @@ -98,7 +100,13 @@ def submit_sge(app, command_tuples): # pragma: no cover


def submit_sge_array(
commands, requirements, jobname, extra_args=None, throttle_by=50, wait=False
commands,
requirements,
jobname,
extra_args=None,
throttle_by=50,
wait=False,
unbuffer=False,
): # pragma: no cover
"""
Submit an array of bash scripts.
Expand All @@ -115,6 +123,7 @@ def submit_sge_array(
extra_args (str): extra LSF args.
throttle_by (int): max number of jobs running at same time.
wait (bool): if true, wait until clean command finishes.
unbuffer (bool): if true, will unbuffer the stdout/stderr.
Returns:
str: jobid of clean up job.
Expand All @@ -129,7 +138,8 @@ def submit_sge_array(
datetime.now(system_settings.TIME_ZONE).isoformat(),
)

wait = "-sync y" if wait else ""
wait_flag = "-sync y" if wait else ""
unbuffer = "unbuffer" if unbuffer and shutil.which("unbuffer") else ""
os.makedirs(root, exist_ok=True)
jobname += "-rundir: {}".format(root)
jobname = slugify(jobname)
Expand All @@ -144,11 +154,14 @@ def submit_sge_array(

with open(join(root, "in.%s" % index), "w") as f:
# use random sleep to avoid parallel API hits
sge_command = (
f"sleep {random.uniform(0, 10):.3} && {unbuffer} bash {command}"
)
f.write(
COMMAND.format(
exit_command=exit_command,
exit_log=join(rundir, "head_job.exit"),
command=f"sleep {random.uniform(0, 10):.3} && bash {command}",
command=sge_command,
)
)

Expand All @@ -174,7 +187,7 @@ def submit_sge_array(
jobid = jobid.strip().split(".")[0]

cmd = (
f'qsub {base_args} -N "CLEAN-{jobname}" -hold_jid {jobid} {wait} '
f'qsub {base_args} -N "CLEAN-{jobname}" -hold_jid {jobid} {wait_flag} '
f"-o /dev/null -e /dev/null {root}/clean.sh"
)

Expand Down

0 comments on commit 379a588

Please sign in to comment.