Skip to content
Merged
43 changes: 39 additions & 4 deletions _helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,47 @@ def attempt_load_config():

def fmt_remote_commands(commands):
"""
Formats a list of shell commands to be run in the SshShell instance.
Necessary because underlying Python SSH client (Paramiko) won't run any
state changes between commands. So we run them all at once
Formats a list-like iterable of shell commands to be run in the SshShell
instance. Necessary because underlying Python SSH client (Paramiko) won't
run any state changes between commands. So we run them all at once.
"""
assert hasattr(commands, "__iter__"), \
"Commands passed to fmt_remote_commands must be as an iterable (i.e., \
list-like) object"

executable = ['bash', '-c']
commands_str = ' && '.join(commands)
commands_str = [' && '.join(commands)]

return executable + commands_str


def get_qstat(remote_shell, options=None):
"""
Return the status of running "qstat" on the cluster, optionally with a
filter for the job's status
:param remote_shell: (spurplus.SshShell instance)
:param options: (str)
options to run along with the "qstat" command. For further
information, run "get_qstat(remote_shell, options=['man'])
locally or "man qstat" from the cluster.
:return qstat_output: (str) output of running command on the cluster
"""
if options is None:
cmd = ['qstat']
elif options == 'man':
cmd = ['man qstat']
elif not options.startswith('-'):
cmd = ['qstat -' + options]
else:
cmd = ['qstat ' + options]

cmds_fmt = fmt_remote_commands(cmd)
return remote_shell.check_output(cmds_fmt)





def md5_checksum(filepath):
"""
computes the MD5 checksum of a local file to compare against remote
Expand Down Expand Up @@ -86,6 +118,9 @@ def parse_config(config_path):
config['confirm_overwrite_on_upload'] = raw_config.getboolean(
'CONFIG', 'confirm_overwrite_on_upload'
)
config['confirm_resubmission'] = raw_config.getboolean(
'CONFIG', 'confirm_resubmission'
)
return config


Expand Down
31 changes: 0 additions & 31 deletions cluster_scripts/run_job_cluster.sh

This file was deleted.

2 changes: 1 addition & 1 deletion cluster_scripts/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
#PBS -M $email_addr

echo ---
job name: $job_name
script name: $job_name
echo loading modules: $modules
module load $modules

Expand Down
2 changes: 1 addition & 1 deletion configs/template_config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
hostname = # name of host where you plan to run jobs
username = # your username for logging into your cluster account
password = # your password (NOTE: remember make sure you don't push this to GitHub!)
submit_command = # command used to submit jobs (mksub if your username starts with f00, otherwise qsub)
confirm_resubmission = # (true or false) whether or not you want to be prompted before resubmitting jobs identified as having failed
confirm_overwrite_on_upload = # (true or false) whether or not you want to be prompted when overwriting existing remote files with local changes
17 changes: 7 additions & 10 deletions remote_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from os.path import dirname, realpath, join as opj
from spurplus import connect_with_retries
from .upload_scripts import upload_scripts
from .cluster_scripts.config import job_config
from ._helpers import (
attempt_load_config,
fmt_remote_commands,
parse_config,
write_remote_submitter
)
from .cluster_scripts.config import job_config


def remote_submit(sync_changes=False, config_path=None):
Expand All @@ -21,13 +21,6 @@ def remote_submit(sync_changes=False, config_path=None):
:param config_path: (str, optional, default: None) path to your config file.
If you created your config following the instructions in
configs/template_config.txt, you can simply leave this empty
:param await_output: (bool, default: False) if True, keep the connection with
the remote open until your submit script is finished creating jobs.
Otherwise, terminate the connection after calling the submit script and allow
job submission to happen in the background.
WARNING: This can be rather lengthy process depending on the number of jobs
you're running. Setting this to True opens you up to the possibility that
the ssh connection may fail before job submission is finished
:return: None (other than some hopefully some results, eventually!)
"""
if config_path is None:
Expand All @@ -38,9 +31,13 @@ def remote_submit(sync_changes=False, config_path=None):
hostname = config['hostname']
username = config['username']
password = config['password']
job_cmd = config['submit_command']
confirm_overwrite = config['confirm_overwrite_on_upload']

if username.startswith('f00'):
job_cmd = 'mksub'
else:
job_cmd = 'qsub'

# set commands
if job_config['env_type'] == 'conda':
activate_cmd = 'source activate'
Expand All @@ -59,7 +56,7 @@ def remote_submit(sync_changes=False, config_path=None):
script_dir = opj(dirname(realpath(__file__)), 'cluster_scripts')
upload_scripts(cluster, script_dir, job_config, confirm_overwrite)

# create bash script to submit jobs from compute node
# create bash script to submit and run submit.py from compute node
submitter_filepath = write_remote_submitter(
cluster,
job_config,
Expand Down
136 changes: 136 additions & 0 deletions resubmit_failed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import sys
from argparse import ArgumentParser
from os.path import join as opj
from spurplus import connect_with_retries
from .cluster_scripts.config import job_config
from ._helpers import (
attempt_load_config,
fmt_remote_commands,
get_qstat,
parse_config,
prompt_input
)


def resubmit_failed(confirm_resubmission=False, config_path=None):
# TODO: add docstring

if config_path is None:
config = attempt_load_config()
else:
config = parse_config(config_path)

hostname = config['hostname']
username = config['username']
password = config['password']
confirm = config['confirm_resubmission']

workingdir = job_config['workingdir']
scriptdir = job_config['scriptdir']
job_name = job_config['jobname']

# set confirmation option from config if not set here
if confirm and not confirm_resubmission:
confirm_resubmission = True

# set submission command
if username.startswith('f00'):
job_cmd = 'mksub'
else:
job_cmd = 'qsub'

with connect_with_retries(
hostname=hostname,
username=username,
password=password
) as cluster:
cluster_sftp = cluster.as_sftp()

# get all created bash scripts
all_scripts = cluster_sftp.listdir(scriptdir)
print(f"found {len(all_scripts)} job scripts")

stdout_files = [f for f in cluster_sftp.listdir(workingdir)
if f.startswith(f'{job_name}.o')]
print(f"found {len(stdout_files)} job stdout files")

# get output of qstat command
running_jobs = [line for line in get_qstat(cluster)
if len(line) > 0 and line[0].isnumeric()]
# filter out completed jobs, isolate jobid
running_jobids = [line.split('.')[0] for line in running_jobs
if line.split()[-2] != 'C']
print(f"found {len(running_jobids)} running jobs")

print("parsing stdout files...")

successful_jobs = {}
for outfile in stdout_files:
jobid = outfile.split('.o')[1]

# read stdout file
stdout_path = opj(workingdir, outfile)
stdout = cluster.read_text(stdout_path)
try:
job_script = stdout.split('script name: ')[1].splitlines()[0]
# track successfully finished jobs
if 'job script finished' in stdout:
successful_jobs[job_script] = jobid
except (IndexError, ValueError):
print(
f"failed to find corresponding script for {outfile}..."
)
continue

to_resubmit = [s for s in all_scripts
if s not in list(successful_jobs.keys())]

if confirm_resubmission:
view_scripts = prompt_input("View jobs to be resubmitted before \
proceeding?")
if view_scripts:
print('\n'.join(to_resubmit))
resubmit_confirmed = prompt_input("Do you want to resubmit \
these jobs?")
if not resubmit_confirmed:
sys.exit()

print("Removing failed jobs' stdout/stderr files...")
for outfile in stdout_files:
jobid = outfile.split('.o')[1]
if not (jobid in successful_jobs.values()
or jobid in running_jobids):
stdout_path = opj(workingdir, outfile)
stderr_path = opj(workingdir, f'{job_name}.e{jobid}')
cluster.remove(stdout_path)
cluster.remove(stderr_path)

print(f"resubmitting {len(to_resubmit)} jobs")
for job in to_resubmit:
script_path = opj(scriptdir, job)
print(f"resubmitting {job}")
cmd = fmt_remote_commands([f'{job_cmd} {script_path}'])
cluster.run(cmd)


if __name__ == '__main__':
description = "Resubmit jobs identified as having failed during initial \
submission"
arg_parser = ArgumentParser(description=description)
arg_parser.add_argument(
"-confirm",
action='store_true',
help="Whether or not you want to be shown the list of \
to-be-resubmitted jobs and prompted to confirm before resubmitting \
them. Passing this overrides default behavior set in your config file"
)
arg_parser.add_argument(
"--config-path",
default=None,
type=str,
help="Path to your config file (optional unless you've moved your \
config file)"
)

args = arg_parser.parse_args()
resubmit_failed(args.confirm, args.config_path)