Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce unknown job status to handle communication problem with resource manager #179

Closed
wants to merge 8 commits into from
92 changes: 53 additions & 39 deletions batchspawner/batchspawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import xml.etree.ElementTree as ET

from enum import Enum

from jinja2 import Template

from tornado import gen
Expand Down Expand Up @@ -55,6 +57,11 @@ def format_template(template, *args, **kwargs):
return Template(template).render(*args, **kwargs)
return template.format(*args, **kwargs)

class JobStatus(Enum):
NOTQUEUED = 0
RUNNING = 1
PENDING = 2
UNKNOWN = 3
rkdarst marked this conversation as resolved.
Show resolved Hide resolved

class BatchSpawnerBase(Spawner):
"""Base class for spawners using resource manager batch job submission mechanisms
Expand Down Expand Up @@ -256,30 +263,37 @@ async def submit_batch_script(self):
self.job_id = ''
return self.job_id

# Override if your batch system needs something more elaborate to read the job status
# Override if your batch system needs something more elaborate to query the job status
batch_query_cmd = Unicode('',
help="Command to run to read job status. Formatted using req_xyz traits as {xyz} "
help="Command to run to query job status. Formatted using req_xyz traits as {xyz} "
"and self.job_id as {job_id}."
).tag(config=True)

async def read_job_state(self):
async def query_job_status(self):
cmd-ntrf marked this conversation as resolved.
Show resolved Hide resolved
if self.job_id is None or len(self.job_id) == 0:
# job not running
self.job_status = ''
return self.job_status
return JobStatus.NOTQUEUED
subvars = self.get_req_subvars()
subvars['job_id'] = self.job_id
cmd = ' '.join((format_template(self.exec_prefix, **subvars),
format_template(self.batch_query_cmd, **subvars)))
self.log.debug('Spawner querying job: ' + cmd)
try:
out = await self.run_command(cmd)
self.job_status = out
self.job_status = await self.run_command(cmd)
except RuntimeError as e:
self.job_status = e.args[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.job_status = e.args[0]
# e.args[0] is stderr from the process
self.job_status = e.args[0]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this change doesn't appear, or am I misinterperting something?

except Exception as e:
self.log.error('Error querying job ' + self.job_id)
self.job_status = ''
finally:
return self.job_status

if self.state_isrunning():
return JobStatus.RUNNING
elif self.state_ispending():
return JobStatus.PENDING
elif self.state_isunknown():
return JobStatus.UNKNOWN
else:
return JobStatus.NOTQUEUED
rkdarst marked this conversation as resolved.
Show resolved Hide resolved

batch_cancel_cmd = Unicode('',
help="Command to stop/cancel a previously submitted job. Formatted like batch_query_cmd."
Expand Down Expand Up @@ -326,22 +340,20 @@ def state_isrunning(self):
"Return boolean indicating if job is running, likely by parsing self.job_status"
raise NotImplementedError("Subclass must provide implementation")

def state_isunknown(self):
"Return boolean indicating if job state retrieval failed because of the resource manager"
raise False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise False
return False

I'm not sure if you meant this to be return False or raise NotImplementedError(). I think it makes sense that this defaults to False, it doesn't need to be implemented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems to be not applied. And I got another idea, it should be None if not implemented. Equivalent to False for practical purposes in a boolean context, but if someone really wanted to know if it was implemented or not, then they can.

Suggested change
raise False
return None


def state_gethost(self):
"Return string, hostname or addr of running job, likely by parsing self.job_status"
raise NotImplementedError("Subclass must provide implementation")

async def poll(self):
"""Poll the process"""
if self.job_id is not None and len(self.job_id) > 0:
await self.read_job_state()
if self.state_isrunning() or self.state_ispending():
return None
else:
self.clear_state()
return 1

if not self.job_id:
# no job id means it's not running
status = await self.query_job_status()
if status in (JobStatus.PENDING, JobStatus.RUNNING, JobStatus.UNKNOWN):
return None
else:
self.clear_state()
return 1

Expand All @@ -366,18 +378,19 @@ async def start(self):
if len(self.job_id) == 0:
raise RuntimeError("Jupyter batch job submission failure (no jobid in output)")
while True:
await self.poll()
if self.state_isrunning():
status = await self.query_job_status()
if status == JobStatus.RUNNING:
break
elif status == JobStatus.PENDING:
self.log.debug('Job ' + self.job_id + ' still pending')
elif status == JobStatus.UNKNOWN:
self.log.debug('Job ' + self.job_id + ' still unknown')
else:
if self.state_ispending():
self.log.debug('Job ' + self.job_id + ' still pending')
else:
self.log.warning('Job ' + self.job_id + ' neither pending nor running.\n' +
self.job_status)
raise RuntimeError('The Jupyter batch job has disappeared'
' while pending in the queue or died immediately'
' after starting.')
self.log.warning('Job ' + self.job_id + ' neither pending nor running.\n' +
self.job_status)
raise RuntimeError('The Jupyter batch job has disappeared'
' while pending in the queue or died immediately'
' after starting.')
await gen.sleep(self.startup_poll_interval)

self.ip = self.state_gethost()
Expand Down Expand Up @@ -410,8 +423,8 @@ async def stop(self, now=False):
if now:
return
for i in range(10):
await self.poll()
if not self.state_isrunning():
status = await self.query_job_status()
if not status in (JobStatus.RUNNING, JobStatus.UNKNOWN):
return
await gen.sleep(1.0)
if self.job_id:
Expand Down Expand Up @@ -467,20 +480,20 @@ class BatchSpawnerRegexStates(BatchSpawnerBase):
If this variable is set, the match object will be expanded using this string
to obtain the notebook IP.
See Python docs: re.match.expand""").tag(config=True)
state_unknown_re = Unicode('^$',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
state_unknown_re = Unicode('^$',
state_unknown_re = Unicode('',

As below, I guess this should be a false-like value if it's not set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the regex is an empty string, it will match any string and state_isunknown will always return True. The regex currently implemented will only match empty string, which I think is a good case to conclude the state of the job is unknown when querying the resource manager.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what I was missing. If it was an empty string, in my mind it should be considered unset, and then not checked and it is never "unknown". I think "unknown" should be an optional state, and if it's not set, it will simply never return true.

help="Regex that matches job_status if the resource manager is not answering").tag(config=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help="Regex that matches job_status if the resource manager is not answering").tag(config=True)
help="Regex that matches job_status if the resource manager is not answering. An empty string means 'not in use'.").tag(config=True)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^-- this is what empty string means


def state_ispending(self):
assert self.state_pending_re, "Misconfigured: define state_running_re"
if self.job_status and re.search(self.state_pending_re, self.job_status):
return True
else:
return False
return self.job_status and re.search(self.state_pending_re, self.job_status)

def state_isrunning(self):
assert self.state_running_re, "Misconfigured: define state_running_re"
if self.job_status and re.search(self.state_running_re, self.job_status):
return True
else:
return False
return self.job_status and re.search(self.state_running_re, self.job_status)

def state_isunknown(self):
assert self.state_unknown_re, "Misconfigured: define state_unknown_re"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert self.state_unknown_re, "Misconfigured: define state_unknown_re"
if not self.state_unknown_re:
return False

I guess this doesn't need to be implemented for all spawners... it goes back to default behavior if it's not here.

return self.job_status and re.search(self.state_unknown_re, self.job_status)
Comment on lines +495 to +497
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def state_isunknown(self):
assert self.state_unknown_re, "Misconfigured: define state_unknown_re"
return self.job_status and re.search(self.state_unknown_re, self.job_status)
def state_isunknown(self):
if self.state_unknown_re:
return self.job_status and re.search(self.state_unknown_re, self.job_status)

Changed so that if state_unknown_re is an empty string, it will always be false.

Does it need to self.job_status and here? Let's see, this would only matter if it's an empty string. In that case, I guess the regex hopefully wouldn't match anyway. BUT- this saves us from an exception if self.job_status is None. So I guess it's correct as-is. Anyone disagree?


def state_gethost(self):
assert self.state_exechost_re, "Misconfigured: define state_exechost_re"
Expand Down Expand Up @@ -634,6 +647,7 @@ class SlurmSpawner(UserEnvMixin,BatchSpawnerRegexStates):
# RUNNING, COMPLETING = running
state_pending_re = Unicode(r'^(?:PENDING|CONFIGURING)').tag(config=True)
state_running_re = Unicode(r'^(?:RUNNING|COMPLETING)').tag(config=True)
state_unknown_re = Unicode(r'^slurm_load_jobs error: (?:Socket timed out on send/recv|Unable to contact slurm controller)').tag(config=True)
state_exechost_re = Unicode(r'\s+((?:[\w_-]+\.?)+)$').tag(config=True)

def parse_job_id(self, output):
Expand Down