Skip to content

Commit a9e5a83

Browse files
Merge pull request #12 from paxtonfitzpatrick/remote_submit
solid framework for revamp, including basic functionality for submitting jobs to cluster from local machine
2 parents 675bf62 + 64b4983 commit a9e5a83

File tree

7 files changed

+280
-44
lines changed

7 files changed

+280
-44
lines changed

_helpers.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import hashlib
2+
import os
3+
from os.path import realpath, join as opj, sep as pathsep
4+
import sys
5+
from configparser import ConfigParser
6+
7+
8+
def attempt_load_config():
9+
"""
10+
tries to load config file from expected path in instances where neither a
11+
filepath or dict-like object is provided
12+
"""
13+
splitpath = realpath(__file__).split(pathsep)
14+
try:
15+
try:
16+
# get path to project root directory
17+
splitroot = splitpath[: splitpath.index('cluster-tools-dartmouth') + 1]
18+
project_root = pathsep.join(splitroot)
19+
config_dir = opj(project_root, 'configs')
20+
except ValueError as e:
21+
# pass exceptions onto broad outer exception for function
22+
raise FileNotFoundError(f"cluster-tools-dartmouth not found in path\
23+
{realpath(__file__)}").with_traceback(e.__traceback__)
24+
25+
configs = os.listdir(config_dir)
26+
# filter out hidden files and the template config
27+
configs = [f for f in configs if not (f.startswith('template') or f.startswith('.'))]
28+
if len(configs) == 1:
29+
config_path = opj(config_dir, configs[0])
30+
config = parse_config(config_path)
31+
return config
32+
else:
33+
# fail if multiple or no config files are found
34+
raise FileNotFoundError(f"Unable to determine which config file to \
35+
read from {len(configs)} choices in {config_dir}")
36+
37+
except FileNotFoundError as e:
38+
raise FileNotFoundError("Failed to load config file from expected \
39+
location").with_traceback(e.__traceback__)
40+
41+
42+
def md5_checksum(filepath):
43+
"""
44+
computes the MD5 checksum of a local file to compare against remote
45+
46+
NOTE: MD5 IS CONSIDERED CRYPTOGRAPHICALLY INSECURE
47+
(see https://en.wikipedia.org/wiki/MD5#Security)
48+
However, it's still very much suitable in cases (like ours) where one
49+
wouldn't expect **intentional** data corruption
50+
"""
51+
hash_md5 = hashlib.md5()
52+
with open(filepath, 'rb') as f:
53+
# avoid having to read the whole file into memory at once
54+
for chunk in iter(lambda: f.read(4096), b''):
55+
hash_md5.update(chunk)
56+
return hash_md5.hexdigest()
57+
58+
59+
def parse_config(config_path):
60+
"""
61+
parses various user-specifc options from config file in configs dir
62+
"""
63+
raw_config = ConfigParser(inline_comment_prefixes='#')
64+
with open(config_path, 'r') as f:
65+
raw_config.read_file(f)
66+
67+
config = dict(raw_config['CONFIG'])
68+
config['confirm_overwrite_on_upload'] = raw_config.getboolean(
69+
'CONFIG', 'confirm_overwrite_on_upload'
70+
)
71+
return config
72+
73+
74+
def prompt_input(question, default=None):
75+
"""
76+
given a question, prompts user for command line input
77+
returns True for 'yes'/'y' and False for 'no'/'n' responses
78+
79+
"""
80+
assert default in ('yes', 'no', None), \
81+
"Default response must be either 'yes', 'no', or None"
82+
83+
valid_responses = {
84+
'yes': True,
85+
'y': True,
86+
'no': False,
87+
'n': False
88+
}
89+
90+
if default is None:
91+
prompt = "[y/n]"
92+
elif default == 'yes':
93+
prompt = "[Y/n]"
94+
else:
95+
prompt = "[y/N]"
96+
97+
while True:
98+
sys.stdout.write(f"{question}\n{prompt}")
99+
response = input().lower()
100+
# if user hits return without typing, return default response
101+
if (default is not None) and (not response):
102+
return valid_responses[default]
103+
elif response in valid_responses:
104+
return valid_responses[response]
105+
else:
106+
sys.stdout.write("Please respond with either 'yes' (or 'y') \
107+
or 'no' (or 'n')\n")

cluster_scripts/config.py

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,24 @@
1-
import socket
2-
import os
3-
from ..helpers import parse_config
1+
from os.path import dirname, realpath, join as opj
42

5-
config = dict()
3+
job_config = dict()
64

75
# ====== MODIFY ONLY THE CODE BETWEEN THESE LINES ======
86
# job creation options
9-
10-
# ******** check kiewit hostname from eduroam ********
11-
if (socket.gethostname() == 'Paxtons-MacBook-Pro') or (socket.gethostname() == 'Paxtons-MacBook-Pro.kiewit.dartmouth.edu') or (socket.gethostname() == 'Paxtons-MacBook-Pro.local'):
12-
config['datadir'] = '/Users/paxtonfitzpatrick/Documents/Dartmouth/Thesis/memory-dynamics/data/models/participants/trajectories'
13-
config['workingdir'] = config['datadir']
14-
config['startdir'] = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) # directory to start the job in
15-
config['template'] = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'run_job_local.sh')
16-
else:
17-
config['datadir'] = os.path.join('/dartfs/rc/lab/D/DBIC/CDL/f0028ph/eventseg', 'trajectories')
18-
config['workingdir'] = '/dartfs/rc/lab/D/DBIC/CDL/f0028ph/eventseg/cluster-scripts'
19-
config['startdir'] = '/dartfs/rc/lab/D/DBIC/CDL/f0028ph/eventseg/'
20-
config['template'] = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'run_job_cluster.sh')
21-
22-
config['scriptdir'] = os.path.join(config['workingdir'], 'scripts')
23-
config['lockdir'] = os.path.join(config['workingdir'], 'locks')
24-
config['resultsdir'] = os.path.join(config['workingdir'], 'results')
25-
7+
job_config['startdir'] = # path to the foler for this project. Should be something like /dartfs/rc/lab/D/DBIC/CDL/<your_net_id>/<project_name>
8+
job_config['datadir'] = opj(job_config['startir'], 'data')
9+
job_config['workingdir'] = opj(job_config['startir'], 'scripts')
10+
job_config['template'] = opj(dirname(realpath(__file__)), 'run_job_cluster.sh')
11+
job_config['scriptdir'] = opj(job_config['workingdir'], 'scripts')
12+
job_config['lockdir'] = opj(job_config['workingdir'], 'locks')
2613

2714
# runtime options
28-
config['jobname'] = # (str) default job name
29-
config['q'] = # (str) options: default, test, largeq
30-
config['nnodes'] = # (int) how many nodes to use for this one job
31-
config['ppn'] = # (int) how many processors to use for this one job (assume 4GB of RAM per processor)
32-
config['walltime'] = # (str) maximum runtime, in h:MM:SS
33-
config['cmd_wrapper'] = # (str) replace with actual command wrapper (e.g. matlab, python, etc.)
34-
config['modules']
35-
36-
#extra options
37-
15+
job_config['jobname'] = # (str) default job name
16+
job_config['q'] = # (str) options: default, test, largeq (when in doubt, use "largeq")
17+
job_config['nnodes'] = # (int) how many nodes to use for this one job
18+
job_config['ppn'] = # (int) how many processors to use for this one job (assume 4GB of RAM per processor)
19+
job_config['walltime'] = # (str) maximum runtime, in h:MM:SS (e.g., "10:00:00")
20+
job_config['cmd_wrapper'] = # (str) replace with actual command wrapper (e.g. "python", "matlab", etc.)
21+
job_config['modules'] = # (str) modules you need to load for your scripts separated by a space (e.g., "python matlab")
22+
job_config['env_type'] = # (str) what kind of Python environment you use (NOTE: sole option is currently conda -- venv and virtualenv coming soon!)
23+
job_config['env_name'] = # (str) names of (currently, conda) environment you want your submission script and jobs to run in
3824
# ====== MODIFY ONLY THE CODE BETWEEN THESE LINES ======

configs/template_config.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# DO NOT EDIT THIS FILE
22
# You don't want to push sensitive information to GitHub by mistake!
3-
# Duplicate this file, rename it config.txt and fill in each field with your personal options
3+
# Duplicate this file, rename it as you see fit, and fill in each field with your personal options
44

55
[CONFIG]
66
hostname = # name of host where you plan to run jobs
77
username = # your username for logging into your cluster account
8-
password = # your password (remember make sure you don't push this to GitHub!)
8+
password = # your password (NOTE: remember make sure you don't push this to GitHub!)
99
submit_command = # command used to submit jobs (mksub if your username starts with f00, otherwise qsub)
10-
confirm_overwrite_on_upload = true
10+
confirm_overwrite_on_upload = # (true or false) whether or not you want to be prompted when overwriting existing remote files with local changes

helpers.py

Lines changed: 0 additions & 10 deletions
This file was deleted.

remote_submit.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import os
2+
from os.path import dirname, realpath, join as opj
3+
from spurplus import connect_with_retries
4+
from .upload_scripts import upload_scripts
5+
from ._helpers import attempt_load_config, parse_config
6+
from .cluster_scripts.config import job_config
7+
8+
9+
def remote_submit(config_path=None, sync_changes=False, await_output=False):
10+
"""
11+
main function that handles submitting jobs on the cluster from your local
12+
machine
13+
14+
:param config_path: (str, optional, default: None) path to your config file.
15+
If you created your config following the instructions in
16+
configs/template_config.txt, you can simply leave this empty
17+
:param sync_changes: (bool, default: False) if True, upload any local
18+
changes to cluster scripts before submitting jobs
19+
:param await_output: (bool, default: False) if True, keep the connection with
20+
the remote open until your submit script is finished creating jobs.
21+
Otherwise, terminate the connection after callin the submit script and allow
22+
job submission to happen in the background.
23+
WARNING: This can be rather lengthy process depending on the number of jobs
24+
you're running. Setting this to True opens you up to the possibility that
25+
the ssh connection may fail before job submission is finished
26+
:return: None (other than some hopefully some results, eventually!)
27+
"""
28+
if config_path is None:
29+
config = attempt_load_config()
30+
else:
31+
config = parse_config(config_path)
32+
33+
hostname = config['hostname']
34+
username = config['username']
35+
password = config['password']
36+
confirm_overwrite = config['confirm_overwrite_on_upload']
37+
38+
modules = job_config['modules']
39+
env_type = job_config['env_type']
40+
env_name = job_config['env_name']
41+
submit_cmd_wrapper = job_config['cmd_wrapper']
42+
# TODO: ability to handle custom-named submission script
43+
submit_script_path = opj(job_config['workingdir'], 'submit.py')
44+
45+
# pre-submission commands to be concatenated and run together in remote shell
46+
remote_cmds = ['sh', '-c']
47+
# command for loading module(s)
48+
module_load_cmd = f'module load {modules}'
49+
# command activating virtual environment
50+
if env_type == 'conda':
51+
activate_cmd = 'source activate'
52+
else:
53+
# TODO: add commands for venv & virtualenv activation
54+
raise ValueError("Only conda environments are currently supported")
55+
env_activate_cmd = f'{activate_cmd} {env_name}'
56+
# command for calling submit script
57+
submit_cmd = f'{submit_cmd_wrapper} {submit_script_path}'
58+
59+
full_submission_cmd = ' && '.join([
60+
module_load_cmd,
61+
env_activate_cmd,
62+
submit_cmd
63+
])
64+
65+
remote_cmds.append(full_submission_cmd)
66+
67+
68+
69+
70+
with connect_with_retries(
71+
hostname=hostname,
72+
username=username,
73+
password=password
74+
) as cluster:
75+
if sync_changes:
76+
script_dir = opj(dirname(realpath(__file__)), 'cluster_scripts')
77+
upload_scripts(
78+
cluster,
79+
script_dir,
80+
job_config,
81+
confirm_overwrite=confirm_overwrite
82+
)
83+
84+
if await_output:
85+
output = cluster.check_output(remote_cmds)
86+
print(output)
87+
else:
88+
cluster.run(remote_cmds)

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
spurplus

upload_scripts.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import os
2+
from os.path import dirname, realpath, join as opj
3+
from spurplus import connect_with_retries
4+
from ._helpers import md5_checksum, attempt_load_config, prompt_input
5+
from .cluster_scripts.config import job_config
6+
7+
8+
def upload_scripts(remote_shell, local_script_dir, job_conf, confirm_overwrite=True):
9+
remote_startdir = job_conf['startdir']
10+
remote_workingdir = job_conf['workingdir']
11+
remote_datadir = job_conf['datadir']
12+
13+
to_upload = os.listdir(local_script_dir)
14+
# ignore hidden files (e.g., .DS_Store on MacOS)
15+
to_upload = [f for f in to_upload if not f.startswith('.')]
16+
for remote_dir in [remote_startdir, remote_workingdir, remote_datadir]:
17+
try:
18+
remote_shell.is_dir(remote_dir)
19+
except FileNotFoundError:
20+
# is_dir method raises exception if path doesn't exist
21+
print(f'creating remote directory: {remote_dir}')
22+
remote_shell.mkdir(remote_dir)
23+
24+
print("uploading scripts...")
25+
for file in to_upload:
26+
src_path = opj(local_script_dir, file)
27+
dest_path = opj(remote_workingdir, file)
28+
if remote_shell.exists(dest_path):
29+
# don't bother uploading file if it hasn't been edited
30+
local_checksum = md5_checksum(src_path)
31+
remote_checksum = remote_shell.md5(dest_path)
32+
if local_checksum == remote_checksum:
33+
print(f"skipping {file} (no changes)")
34+
continue
35+
36+
if confirm_overwrite:
37+
# prompt for confirmation of overwrite if option is enabled
38+
question = f"{file}: overwrite remote version with local changes?"
39+
overwrite_confirmed = prompt_input(question)
40+
if not overwrite_confirmed:
41+
print(f"skipping {file} (overwrite declined)")
42+
continue
43+
44+
remote_shell.put(src_path, dest_path, create_directories=False)
45+
print(f"uploaded {file}")
46+
print("finished uploading scripts")
47+
48+
49+
# setup for running as a stand-alone script
50+
if __name__ == '__main__':
51+
config = attempt_load_config()
52+
hostname = config['hostname']
53+
username = config['username']
54+
password = config['password']
55+
confirm_overwrite = config['confirm_overwrite_on_upload']
56+
57+
script_dir = opj(dirname(realpath(__file__)), 'cluster_scripts')
58+
59+
with connect_with_retries(
60+
hostname=hostname,
61+
username=username,
62+
password=password
63+
) as cluster:
64+
upload_scripts(cluster, script_dir, job_config, confirm_overwrite=confirm_overwrite)

0 commit comments

Comments
 (0)