Skip to content

Commit

Permalink
Only log count of pending jobs in nodewatcher log
Browse files Browse the repository at this point in the history
* Add an optional parameter to the get_pending_jobs_info function for
each of the scheduler commands modules that enables the suppression of a
log message containing every pending job.
* Add a log message for the count of pending jobs that is always logged.
* Suppress the log message containing all pending jobs when calling the
function from nodewatcher.

Signed-off-by: Tim Lane <tilne@amazon.com>
  • Loading branch information
Tim Lane authored and tilne committed Apr 3, 2020
1 parent 79db0e4 commit bf66971
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 10 deletions.
7 changes: 5 additions & 2 deletions src/common/schedulers/sge_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,19 @@ def get_jobs_info(hostname_filter=None, job_state_filter=None):
return [SgeJob.from_xml(ElementTree.tostring(host)) for host in job_info]


def get_pending_jobs_info(max_slots_filter=None, skip_if_state=None):
def get_pending_jobs_info(max_slots_filter=None, skip_if_state=None, log_pending_jobs=True):
"""
Retrieve the list of pending jobs.
:param max_slots_filter: discard jobs that require a number of slots bigger than the given value
:param skip_if_state: discard jobs that are in the given state
:param log_pending_jobs: log the actual list of pending jobs (rather than just a count)
:return: the list of filtered pending jos.
"""
pending_jobs = get_jobs_info(job_state_filter="p")
logging.info("Retrieved the following original pending jobs: {0}".format(pending_jobs))
logging.info("Retrieved {0} pending jobs".format(len(pending_jobs)))
if log_pending_jobs:
logging.info("The pending jobs are: {0}".format(pending_jobs))
if max_slots_filter or skip_if_state:
filtered_jobs = []
for job in pending_jobs:
Expand Down
9 changes: 7 additions & 2 deletions src/common/schedulers/slurm_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def get_jobs_info(job_state_filter=None):
return SlurmJob.from_table(output)


def get_pending_jobs_info(instance_properties=None, max_nodes_filter=None, filter_by_pending_reasons=None):
def get_pending_jobs_info(
instance_properties=None, max_nodes_filter=None, filter_by_pending_reasons=None, log_pending_jobs=True
):
"""
Retrieve the list of pending jobs from the Slurm scheduler.
Expand All @@ -71,10 +73,13 @@ def get_pending_jobs_info(instance_properties=None, max_nodes_filter=None, filte
:param max_slots_filter: max number of slots in a compute node.
:param max_nodes_filter: max number of nodes in the cluster.
:param filter_by_pending_reasons: retrieve only jobs with the following pending reasons.
:param log_pending_jobs: log the actual list of pending jobs (rather than just a count)
:return: array of filtered SlurmJobs
"""
pending_jobs = get_jobs_info(job_state_filter="PD")
logging.info("Retrieved the following original pending jobs: {0}".format(pending_jobs))
logging.info("Retrieved {0} pending jobs".format(len(pending_jobs)))
if log_pending_jobs:
logging.info("The pending jobs are: {0}".format(pending_jobs))
if instance_properties:
_recompute_required_nodes_by_slots_reservation(pending_jobs, instance_properties["slots"])
_recompute_required_nodes_by_gpu_reservation(pending_jobs, instance_properties["gpus"])
Expand Down
12 changes: 10 additions & 2 deletions src/common/schedulers/torque_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,17 @@ def get_jobs_info(filter_by_states=None, filter_by_exec_hosts=None):
return jobs_list


def get_pending_jobs_info(max_slots_filter=None):
def get_pending_jobs_info(max_slots_filter=None, log_pending_jobs=True):
"""
Retrieve the list of pending jobs from the Slurm scheduler.
:param max_slots_filter: discard jobs that require a number of slots bigger than the given value
:param log_pending_jobs: log the actual list of pending jobs (rather than just a count)
"""
jobs = get_jobs_info(filter_by_states=[TORQUE_PENDING_JOB_STATE])
logging.info("Retrieved the following original pending jobs: {0}".format(jobs))
logging.info("Retrieved {0} pending jobs".format(len(jobs)))
if log_pending_jobs:
logging.info("The pending jobs are: {0}".format(jobs))
pending_jobs = []
for job in jobs:
# filtering of ncpus option is already done by the scheduler at job submission time
Expand Down
4 changes: 3 additions & 1 deletion src/nodewatcher/plugins/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def has_pending_jobs(instance_properties, max_size):
"""
try:
max_cluster_slots = max_size * instance_properties.get("slots")
pending_jobs = get_pending_jobs_info(max_slots_filter=max_cluster_slots, skip_if_state=SGE_HOLD_STATE)
pending_jobs = get_pending_jobs_info(
max_slots_filter=max_cluster_slots, skip_if_state=SGE_HOLD_STATE, log_pending_jobs=False
)
return len(pending_jobs) > 0, False
except Exception as e:
log.error("Failed when checking for pending jobs with exception %s. Reporting no pending jobs.", e)
Expand Down
1 change: 1 addition & 0 deletions src/nodewatcher/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def has_pending_jobs(instance_properties, max_size):
instance_properties=instance_properties,
max_nodes_filter=max_size,
filter_by_pending_reasons=PENDING_RESOURCES_REASONS,
log_pending_jobs=False,
)
return len(pending_jobs) > 0, False
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion src/nodewatcher/plugins/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def has_pending_jobs(instance_properties, max_size):
an error when checking for pending jobs.
"""
try:
pending_jobs = get_pending_jobs_info(max_slots_filter=instance_properties.get("slots"))
pending_jobs = get_pending_jobs_info(max_slots_filter=instance_properties.get("slots"), log_pending_jobs=False)
return len(pending_jobs) > 0, False
except Exception as e:
log.error("Failed when checking for pending jobs with exception %s. Reporting no pending jobs.", e)
Expand Down
4 changes: 3 additions & 1 deletion tests/nodewatcher/plugins/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def test_has_pending_jobs(pending_jobs, expected_result, mocker):

assert_that(has_pending_jobs(instance_properties, max_cluster_size)).is_equal_to(expected_result)
mock.assert_called_with(
max_slots_filter=max_cluster_size * instance_properties["slots"], skip_if_state=SGE_HOLD_STATE
max_slots_filter=max_cluster_size * instance_properties["slots"],
skip_if_state=SGE_HOLD_STATE,
log_pending_jobs=False,
)


Expand Down
1 change: 1 addition & 0 deletions tests/nodewatcher/plugins/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ def test_has_pending_jobs(pending_jobs, expected_result, mocker):
filter_by_pending_reasons=PENDING_RESOURCES_REASONS,
max_nodes_filter=max_cluster_size,
instance_properties=instance_properties,
log_pending_jobs=False,
)
2 changes: 1 addition & 1 deletion tests/nodewatcher/plugins/test_torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def test_has_pending_jobs(pending_jobs, expected_result, mocker):
max_cluster_size = 10

assert_that(has_pending_jobs(instance_properties, max_cluster_size)).is_equal_to(expected_result)
mock.assert_called_with(max_slots_filter=instance_properties["slots"])
mock.assert_called_with(max_slots_filter=instance_properties["slots"], log_pending_jobs=False)


@pytest.mark.parametrize(
Expand Down

0 comments on commit bf66971

Please sign in to comment.