Skip to content

Add new attributes to db.Jobs and truncate_time option to db.JobFilter #321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- New Classes to interact with Database QoS (WIP)
- `pyslurm.db.QualityOfService`
- `pyslurm.db.QualitiesOfService`
- Add `truncate_time` option to `pyslurm.db.JobFilter`, which is the same as -T /
--truncate from sacct.
- Add new Attributes to `pyslurm.db.Jobs` that help gathering statistics for a
collection of Jobs more convenient.
- Fix `allocated_gres` attribute in the `pyslurm.Node` Class returning nothing.
- Add new `idle_memory` and `allocated_tres` attributes to `pyslurm.Node` class
- Fix Node State being displayed as `ALLOCATED` when it should actually be
`MIXED`.

## [23.2.2](https://github.com/PySlurm/pyslurm/releases/tag/v23.2.2) - 2023-07-18

Expand Down
61 changes: 58 additions & 3 deletions pyslurm/db/job.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ cdef class JobFilter:
Instruct the slurmdbd to also send the job environment(s)
Note: This requires specifying explictiy job ids, and is mutually
exclusive with `with_script`
truncate_time (bool):
Truncate start and end time.
For example, when a Job has actually started before the requested
`start_time`, the time will be truncated to `start_time`. Same
logic applies for `end_time`. This is like the `-T` / `--truncate`
option from `sacct`.
"""
cdef slurmdb_job_cond_t *ptr

Expand All @@ -149,11 +155,60 @@ cdef class JobFilter:
nodelist
with_script
with_env
truncate_time


cdef class Jobs(MultiClusterMap):
"""A [`Multi Cluster`][pyslurm.xcollections.MultiClusterMap] collection of [pyslurm.db.Job][] objects."""
pass
"""A [`Multi Cluster`][pyslurm.xcollections.MultiClusterMap] collection of [pyslurm.db.Job][] objects.

Args:
jobs (Union[list[int], dict[int, pyslurm.db.Job], str], optional=None):
Jobs to initialize this collection with.

Attributes:
consumed_energy (int):
Total amount of energy consumed, in joules.
disk_read (int):
Total amount of bytes read.
disk_write (int):
Total amount of bytes written.
page_faults (int):
Total amount of page faults.
resident_memory (int):
Total Resident Set Size (RSS) used in bytes.
virtual_memory (int):
Total Virtual Memory Size (VSZ) used in bytes.
elapsed_cpu_time (int):
Total amount of time used (Elapsed time * cpu count) in seconds.
This is not the real CPU-Efficiency, but rather the total amount
of cpu-time the CPUs were occupied for.
total_cpu_time (int):
Sum of `user_cpu_time` and `system_cpu_time`, in seconds
user_cpu_time (int):
Total amount of Time spent in user space, in seconds
system_cpu_time (int):
Total amount of Time spent in kernel space, in seconds
cpus (int):
Total amount of cpus.
nodes (int):
Total amount of nodes.
memory (int):
Total amount of requested memory in Mebibytes.
"""
cdef public:
consumed_energy
disk_read
disk_write
page_faults
resident_memory
virtual_memory
elapsed_cpu_time
total_cpu_time
user_cpu_time
system_cpu_time
cpus
nodes
memory


cdef class Job:
Expand Down Expand Up @@ -252,7 +307,7 @@ cdef class Job:
Amount of CPUs the Job has/had allocated, or, if the Job is still
pending, this will reflect the amount requested.
memory (int):
Amount of memory the Job requested in total
Amount of memory the Job requested in total, in Mebibytes
reservation (str):
Name of the Reservation for this Job
script (str):
Expand Down
33 changes: 30 additions & 3 deletions pyslurm/db/job.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ from typing import Any
from pyslurm.utils.uint import *
from pyslurm.settings import LOCAL_CLUSTER
from pyslurm import xcollections
from pyslurm.db.stats import (
reset_stats_for_job_collection,
add_stats_to_job_collection,
)
from pyslurm.utils.ctime import (
date_to_timestamp,
timestr_to_mins,
Expand Down Expand Up @@ -146,6 +150,9 @@ cdef class JobFilter:
if self.nodelist:
cstr.fmalloc(&ptr.used_nodes,
nodelist_to_range_str(self.nodelist))

if self.truncate_time:
ptr.flags &= ~slurm.JOBCOND_FLAG_NO_TRUNC

if self.ids:
# These are only allowed by the slurmdbd when specific jobs are
Expand Down Expand Up @@ -196,6 +203,7 @@ cdef class Jobs(MultiClusterMap):
val_type=Job,
id_attr=Job.id,
key_type=int)
self._reset_stats()

@staticmethod
def load(JobFilter db_filter=None, Connection db_connection=None):
Expand Down Expand Up @@ -275,15 +283,35 @@ cdef class Jobs(MultiClusterMap):
job = Job.from_ptr(<slurmdb_job_rec_t*>job_ptr.data)
job.qos_data = qos_data
job._create_steps()
JobStatistics._sum_step_stats_for_job(job, job.steps)
job.stats = JobStatistics.from_job_steps(job)

cluster = job.cluster
if cluster not in out.data:
out.data[cluster] = {}
out[cluster][job.id] = job

add_stats_to_job_collection(out, job.stats)
out.cpus += job.cpus
out.nodes += job.num_nodes
out.memory += job.memory

return out

def _reset_stats(self):
reset_stats_for_job_collection(self)
self.cpus = 0
self.nodes = 0
self.memory = 0

def calc_stats(self):
"""(Re)Calculate Statistics for the Job Collection."""
self._reset_stats()
for job in self.values():
add_stats_to_job_collection(self, job.stats)
self.cpus += job.cpus
self.nodes += job.num_nodes
self.memory += job.memory

@staticmethod
def modify(db_filter, Job changes, db_connection=None):
"""Modify Slurm database Jobs.
Expand Down Expand Up @@ -445,7 +473,6 @@ cdef class Job:
cdef Job wrap = Job.__new__(Job)
wrap.ptr = in_ptr
wrap.steps = JobSteps.__new__(JobSteps)
wrap.stats = JobStatistics()
return wrap

@staticmethod
Expand Down Expand Up @@ -738,7 +765,7 @@ cdef class Job:
else:
# Job is still pending, so we return the number of requested cpus
# instead.
return u32_parse(self.ptr.req_cpus)
return u32_parse(self.ptr.req_cpus, on_noval=0, zero_is_noval=False)

@property
def memory(self):
Expand Down
3 changes: 3 additions & 0 deletions pyslurm/db/stats.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ cdef class JobStatistics:
user_cpu_time
system_cpu_time

@staticmethod
cdef JobStatistics from_job_steps(Job job)

@staticmethod
cdef JobStatistics from_step(JobStep step)

143 changes: 86 additions & 57 deletions pyslurm/db/stats.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,32 @@ from pyslurm.utils.helpers import (
)


def reset_stats_for_job_collection(jobs):
jobs.consumed_energy = 0
jobs.disk_read = 0
jobs.disk_write = 0
jobs.page_faults = 0
jobs.resident_memory = 0
jobs.virtual_memory = 0
jobs.elapsed_cpu_time = 0
jobs.total_cpu_time = 0
jobs.user_cpu_time = 0
jobs.system_cpu_time = 0


def add_stats_to_job_collection(jobs, JobStatistics js):
jobs.consumed_energy += js.consumed_energy
jobs.disk_read += js.avg_disk_read
jobs.disk_write += js.avg_disk_write
jobs.page_faults += js.avg_page_faults
jobs.resident_memory += js.avg_resident_memory
jobs.virtual_memory += js.avg_virtual_memory
jobs.elapsed_cpu_time += js.elapsed_cpu_time
jobs.total_cpu_time += js.total_cpu_time
jobs.user_cpu_time += js.user_cpu_time
jobs.system_cpu_time += js.system_cpu_time


cdef class JobStatistics:

def __init__(self):
Expand All @@ -50,6 +76,21 @@ cdef class JobStatistics:
def to_dict(self):
return instance_to_dict(self)

@staticmethod
cdef JobStatistics from_job_steps(Job job):
cdef JobStatistics job_stats = JobStatistics()

for step in job.steps.values():
job_stats._add_base_stats(step.stats)

job_stats._sum_cpu_time(job)

step_count = len(job.steps)
if step_count:
job_stats.avg_cpu_frequency /= step_count

return job_stats

@staticmethod
cdef JobStatistics from_step(JobStep step):
cdef JobStatistics wrap = JobStatistics()
Expand Down Expand Up @@ -140,68 +181,56 @@ cdef class JobStatistics:

return wrap

@staticmethod
def _sum_step_stats_for_job(Job job, JobSteps steps):
cdef:
JobStatistics job_stats = job.stats
JobStatistics step_stats = None

for step in steps.values():
step_stats = step.stats

job_stats.consumed_energy += step_stats.consumed_energy
job_stats.avg_cpu_time += step_stats.avg_cpu_time
job_stats.avg_cpu_frequency += step_stats.avg_cpu_frequency
job_stats.avg_disk_read += step_stats.avg_disk_read
job_stats.avg_disk_write += step_stats.avg_disk_write
job_stats.avg_page_faults += step_stats.avg_page_faults

if step_stats.max_disk_read >= job_stats.max_disk_read:
job_stats.max_disk_read = step_stats.max_disk_read
job_stats.max_disk_read_node = step_stats.max_disk_read_node
job_stats.max_disk_read_task = step_stats.max_disk_read_task

if step_stats.max_disk_write >= job_stats.max_disk_write:
job_stats.max_disk_write = step_stats.max_disk_write
job_stats.max_disk_write_node = step_stats.max_disk_write_node
job_stats.max_disk_write_task = step_stats.max_disk_write_task

if step_stats.max_page_faults >= job_stats.max_page_faults:
job_stats.max_page_faults = step_stats.max_page_faults
job_stats.max_page_faults_node = step_stats.max_page_faults_node
job_stats.max_page_faults_task = step_stats.max_page_faults_task

if step_stats.max_resident_memory >= job_stats.max_resident_memory:
job_stats.max_resident_memory = step_stats.max_resident_memory
job_stats.max_resident_memory_node = step_stats.max_resident_memory_node
job_stats.max_resident_memory_task = step_stats.max_resident_memory_task
job_stats.avg_resident_memory = job_stats.max_resident_memory

if step_stats.max_virtual_memory >= job_stats.max_virtual_memory:
job_stats.max_virtual_memory = step_stats.max_virtual_memory
job_stats.max_virtual_memory_node = step_stats.max_virtual_memory_node
job_stats.max_virtual_memory_task = step_stats.max_virtual_memory_task
job_stats.avg_virtual_memory = job_stats.max_virtual_memory

if step_stats.min_cpu_time >= job_stats.min_cpu_time:
job_stats.min_cpu_time = step_stats.min_cpu_time
job_stats.min_cpu_time_node = step_stats.min_cpu_time_node
job_stats.min_cpu_time_task = step_stats.min_cpu_time_task

def _add_base_stats(self, JobStatistics src):
self.consumed_energy += src.consumed_energy
self.avg_cpu_time += src.avg_cpu_time
self.avg_cpu_frequency += src.avg_cpu_frequency
self.avg_disk_read += src.avg_disk_read
self.avg_disk_write += src.avg_disk_write
self.avg_page_faults += src.avg_page_faults

if src.max_disk_read >= self.max_disk_read:
self.max_disk_read = src.max_disk_read
self.max_disk_read_node = src.max_disk_read_node
self.max_disk_read_task = src.max_disk_read_task

if src.max_disk_write >= self.max_disk_write:
self.max_disk_write = src.max_disk_write
self.max_disk_write_node = src.max_disk_write_node
self.max_disk_write_task = src.max_disk_write_task

if src.max_page_faults >= self.max_page_faults:
self.max_page_faults = src.max_page_faults
self.max_page_faults_node = src.max_page_faults_node
self.max_page_faults_task = src.max_page_faults_task

if src.max_resident_memory >= self.max_resident_memory:
self.max_resident_memory = src.max_resident_memory
self.max_resident_memory_node = src.max_resident_memory_node
self.max_resident_memory_task = src.max_resident_memory_task
self.avg_resident_memory = self.max_resident_memory

if src.max_virtual_memory >= self.max_virtual_memory:
self.max_virtual_memory = src.max_virtual_memory
self.max_virtual_memory_node = src.max_virtual_memory_node
self.max_virtual_memory_task = src.max_virtual_memory_task
self.avg_virtual_memory = self.max_virtual_memory

if src.min_cpu_time >= self.min_cpu_time:
self.min_cpu_time = src.min_cpu_time
self.min_cpu_time_node = src.min_cpu_time_node
self.min_cpu_time_task = src.min_cpu_time_task

def _sum_cpu_time(self, Job job):
if job.ptr.tot_cpu_sec != slurm.NO_VAL64:
job_stats.total_cpu_time = job.ptr.tot_cpu_sec
self.total_cpu_time += job.ptr.tot_cpu_sec

if job.ptr.user_cpu_sec != slurm.NO_VAL64:
job_stats.user_cpu_time = job.ptr.user_cpu_sec
self.user_cpu_time += job.ptr.user_cpu_sec

if job.ptr.sys_cpu_sec != slurm.NO_VAL64:
job_stats.system_cpu_time = job.ptr.sys_cpu_sec
self.system_cpu_time += job.ptr.sys_cpu_sec

elapsed = job.elapsed_time if job.elapsed_time else 0
cpus = job.cpus if job.cpus else 0
job_stats.elapsed_cpu_time = elapsed * cpus

step_count = len(steps)
if step_count:
job_stats.avg_cpu_frequency /= step_count

self.elapsed_cpu_time += elapsed * cpus