If you haven't already, download the files and change your working directory:
$ git clone https://github.com/flux-framework/flux-workflow-examples.git
$ cd flux-workflow-examples/job-status-control
- Allocate three nodes from a resource manager:
salloc -N3 -p pdebug
- Launch a Flux instance on the current allocation by running
flux start
once per node, redirecting log messages to the fileout
in the current directory:
srun --pty --mpi=none -N3 flux start -o,-S,log-filename=out
- Run the bookkeeper executable along with the number of jobs to be submitted (if no size is specified, 6 jobs are submitted: 3 instances of compute.py, and 3 instances of io-forwarding,py):
./bookkeeper.py 2
bookkeeper: all jobs submitted
bookkeeper: waiting until all jobs complete
job 39040581632 triggered event 'submit'
job 39040581633 triggered event 'submit'
job 39040581632 triggered event 'depend'
job 39040581632 triggered event 'priority'
job 39040581632 triggered event 'alloc'
job 39040581633 triggered event 'depend'
job 39040581633 triggered event 'priority'
job 39040581633 triggered event 'alloc'
job 39040581632 triggered event 'start'
job 39040581633 triggered event 'start'
job 39040581632 triggered event 'finish'
job 39040581633 triggered event 'finish'
job 39040581633 triggered event 'release'
job 39040581633 triggered event 'free'
job 39040581633 triggered event 'clean'
job 39040581632 triggered event 'release'
job 39040581632 triggered event 'free'
job 39040581632 triggered event 'clean'
bookkeeper: all jobs completed
- The following constructs a job request using the JobspecV1 class with customizable parameters for how you want to utilize the resources allocated for your job:
compute_jobreq = JobspecV1.from_command(
command=["./compute.py", "10"], num_tasks=4, num_nodes=2, cores_per_task=2
)
compute_jobreq.cwd = os.getcwd()
compute_jobreq.environment = dict(os.environ)
-
with FluxExecutor() as executor:
creates a newFluxExecutor
which can be used to submit jobs, wait for them to complete, and get event updates. Using the executor as a context manager (with ... as ...:
) ensures it is shut down properly. -
executor.submit(compute_jobreq)
returns aconcurrent.futures.Future
subclass which completes when the underlying job is done. The jobid of the underlying job can be fetched with the.jobid([timeout])
method (which waits until the jobid is ready). -
Throughout the course of a job, various events will occur to it.
future.add_event_callback(event, event_callback)
adds a callback which will be invoked when the given event occurs.