Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion clustertools/file_objects/configs/config_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,18 @@ def update_config_from_global(inst: ProjectConfig, pref: bool) -> bool:
return pref


@bindable
def init_project_job_monitor(inst: ProjectConfig, pref: bool) -> bool:
# initializes a monitor Job object on the associated Project object
# when auto_monitor_jobs is set to True, removes it when set to False
if pref and not inst._config.monitoring.auto_monitor_jobs:
inst._project._init_monitor()
elif not pref:
inst._project._monitor_script = inst._project._monitor = None
return pref


PROJECT_CONFIG_UPDATE_HOOKS = {
'use_global_environ': update_config_from_global
'use_global_environ': update_config_from_global,
'auto_monitor_jobs': init_project_job_monitor
}
114 changes: 13 additions & 101 deletions clustertools/project/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def wrapper_template(self):
@property
def wrapper(self):
proj = self._project
config = proj.config
# first, fill in fields whose values:
# A) in turn contain configurable fields, or
# B) depend on the job type
Expand All @@ -115,11 +116,11 @@ def wrapper(self):
field_vals['environ_export_directive'] = ''
field_vals['mail_options'] = ''
if self.kind in ('runner', 'collector', 'pre_submit'):
field_vals['job_executable']: proj.job_executable
field_vals['n_nodes']: proj.n_nodes
field_vals['ppn']: proj.ppn
field_vals['wall_time']: proj.wall_time
modules = ' '.join(proj.modules)
field_vals['job_executable'] = config.general.job_executable
field_vals['n_nodes'] = config.pbs_params.n_nodes
field_vals['ppn'] = config.pbs_params.ppn
field_vals['wall_time'] = config.pbs_params.wall_time
modules = ' '.join(config.runtime_environment.modules)
field_vals['dependency_directive'] = ''
if any(modules):
cmd = f"""echo "loading modules: {modules}"
Expand All @@ -130,7 +131,7 @@ def wrapper(self):
field_vals['module_load_cmd'] = ''
if self.kind == 'collector':
field_vals['hold_directive'] = '#${directive_prefix} -h'
if proj.notify_collector_finished:
if config.notifications.collector_finished:
field_vals['mail_options'] += 'e'
else:
field_vals['hold_directive'] = ''
Expand Down Expand Up @@ -158,7 +159,7 @@ def wrapper(self):
mins = total_secs % 3600 // 60
secs = total_secs % 3600 % 60
field_vals['wall_time'] = f'{hrs:02d}:{mins:02d}:{secs:02d}'
if proj.notify_all_submitted:
if config.notifications.all_submitted:
field_vals['mail_options'] += 'e'
if proj.pre_submit_script is not None:
field_vals['dependency_directive'] = ('#${directive_prefix} '
Expand All @@ -170,9 +171,9 @@ def wrapper(self):
# 1 day max)
# also, the datetime library sucks
try:
hrs, mins, secs = map(int, proj.wall_time.split(':'))
hrs, mins, secs = map(int, config.pbs_params.wall_time.split(':'))
except ValueError:
mins, secs = map(int, proj.wall_time.split(':'))
mins, secs = map(int, config.pbs_params.wall_time.split(':'))
hrs = 0
total_secs = hrs * 3600 + mins * 60 + secs
double_job_walltime = total_secs * 2
Expand All @@ -181,7 +182,7 @@ def wrapper(self):
mins = monitor_walltime % 3600 // 60
secs = monitor_walltime % 3600 % 60
field_vals['wall_time'] = f'{hrs:02d}:{mins:02d}:{secs:02d}'
if proj.notify_all_finished:
if config.notifications.all_finished:
field_vals['mail_options'] += 'e'


Expand All @@ -191,7 +192,7 @@ def wrapper(self):

# TODO: make sure this checks for proper values in final field format
email_addrs = self._project.config.notifications.email_list
if field_vals['mail_options'] == 'n' or self._project.email_list == 'INFER':
if field_vals['mail_options'] == 'n' or self._project.config.notifications.email_list == 'INFER':
# emails (if any) will be sent to the user's default email address
field_vals['email_directive'] = ''
else:
Expand All @@ -217,95 +218,6 @@ def wrapper(self):



if any(proj_environ):
environ_fmt = ','.join('='.join(item) for item in proj_environ.items()),
# if set, environment variables are exported to all job types
field_vals['environ_export_directive'] = ('#${directive_prefix} -v '
f'{environ_fmt}')
else:
field_vals['environ_export_directive'] = ''
if self.kind == 'runner':
field_vals['job_executable']: proj.job_executable
field_vals['n_nodes']: proj.n_nodes
field_vals['ppn']: proj.ppn
field_vals['wall_time']: proj.wall_time
field_vals['dependency_directive'] = ''
field_vals['hold_directive'] = ''
modules = ' '.join(proj.modules)
if any(modules):
cmd = f"""echo "loading modules: {modules}"
module load ${modules}
""".splitlines()
field_vals['module_load_cmd'] = '\n'.join(map(str.lstrip, cmd))
else:
field_vals['module_load_cmd'] = ''
elif self.kind == 'submitter':
field_vals['job_executable'] = 'python'
field_vals['n_nodes'] = 1
field_vals['ppn'] = 1
# estimate walltime needed to submit all jobs, assuming
# (conservatively) 1 job submitted every 3s plus some
# overhead for module load, conda activate, etc.
total_secs = len(proj.jobs) * 3 + 30
hrs = total_secs // 3600
mins = total_secs % 3600 // 60
secs = total_secs % 3600 % 60
field_vals['wall_time'] = f'{hrs:02d}:{mins:02d}:{secs:02d}'
field_vals['hold_directive'] = ''
if proj.pre_submit_script is not None:
# submit script will receive pre-submit job's jobid ("$1")
field_vals['dependency_directive'] = '#${directive_prefix} -W depend=afterok:$1'
else:
field_vals['dependency_directive'] = ''
field_vals['hold_directive'] = ''
field_vals['module_load_cmd'] = ''
elif self.kind == 'monitor':
...
if self.kind == 'collector':
field_vals['hold_directive'] = '#${directive_prefix} -h'
else:
field_vals['hold_directive'] = ''

# TODO: make sure the spacing/alignment of these multiline
# strings is getting translated properly
full_wrapper_str = Job._wrapper_template.safe_substitute(first_subs)
full_wrapper_template = Template(full_wrapper_str)
# fill in remaining fields
field_vals = {
'directive_prefix': proj.directive_prefix,
'job_name': self.name,
'project_root': proj.root_dir,
'stdout_path': self.stdout_path,
'stderr_path': self.stderr_path,
'queue': proj.queue,
'user': proj.user_to_notify,
'job_type': self.kind,
'modules': ' '.join(proj.modules)
}
# logic for constructing #PBS -m option str based on project
# config values and job type
# this could be written in fewer lines, but this way results in
# fewer namespace lookups, which could potentially add up to
# something significant for extremely long JobLists
# NOTE: the 'all_finished' config field affects params passed to monitor job, rather than
mail_opts = ''
if self.kind == 'submitter' and proj.notify_all_submitted:
mail_opts = 'e'
elif self.kind == 'collector' and proj.notify_collector_finished:
mail_opts = 'e'
elif self.kind == 'runner':
if proj.notify_job_started:
mail_opts += 'b'
if proj.notify_job_finished:
mail_opts += 'e'
if proj.notify_job_aborted:
mail_opts += 'a'
if proj.notify_job_failed:
mail_opts += 'f'
if mail_opts == '':
# no mail will be sent
mail_opts = 'n'
field_vals['mail_options'] = mail_opts
# only have to check one of the two virtual environment commands,
# 'Project.check_submittable()' ensures either both or neither
# is set
Expand All @@ -323,7 +235,7 @@ def wrapper(self):
else:
field_vals['env_activate_cmd'] = ''
field_vals['env_deactivate_cmd'] = ''
return full_wrapper_template.substitute(field_vals)
return partial_template.substitute(field_vals)

# @property
# def wrapper(self):
Expand Down
Loading