An automation framework for running sequential metagenome analysis jobs and making the outputs available as metadata in the NMDC database, and data objects on the NMDC data portal.
- nmdc_automation
- mongodb-community needs to be installed and running on the local machine
- Python 3.11 or later
- Poetry version 2.2.1
Poetry Installation instructions can be found Here
Install MongoDB using Homebrew on MacOS:
brew tap mongodb/brew
brew install mongodb-community
brew services start mongodb-communityFull Mongodb installation instructions for Mac can be found here
Ensure that the mongodb service is running:
brew services start mongodb-community-
Clone the repository
git clone https://github.com/microbiomedata/nmdc_automation.git
-
Install the required packages
cd nmdc_automation poetry install -
Activate the poetry environment
eval $(poetry env activate) -
Run the tests
make test
The Scheduler polls the NMDC database based upon an Allowlist of DataGeneration IDs. Based on an allowed data-generation ID, the scheduler examines WorkflowExecutions and DataObjects that was_informed_by by the data generation, and builds a graph of Workflow Process Nodes.
A Workflow Process Node is a representation of:
workflow- the workflow configuration, from workflows.yaml. The "recipe" for the given type of analysisworkflow.children- the child workflow recipes that can be run after this workflow
process- the planned process, from the NMDC database. The "instance" of a workflow execution or data generation from the NMDC databaseparent- the parent workflow process node, if anychildren- the child workflow process nodes, if any
Workflow Process Node Mermaid Diagram:
erDiagram
WorkflowProcessNode ||--|| PlannedProcess: "process"
PlannedProcess ||-- |{ DataObject: "has_input / has_output"
WorkflowProcessNode }|--|| WorkflowConfig: "workflow"
WorkflowConfig ||--o{ WorkflowConfig: "children"
WorkflowProcessNode |o--o| WorkflowProcessNode: "parent"
WorkflowProcessNode |o--o{ WorkflowProcessNode: "children"
When the Scheduler finds a node where:
- The node has a workflow configuration in node.workflow.children
- The node DOES NOT have a child node in node.children
- The required inputs for the child workflow are available in node's process outputs
Scheduler Process Mermaid Diagram:
erDiagram
WPNode_Sequencing ||--|| WPNode_ReadsQC: "children nodes"
WPNode_Sequencing ||--|| WConfig_Sequencing: "workflow"
WConfig_Sequencing ||--o{ WConfig_ReadsQC: "children workflows"
WPNode_Sequencing ||--|| Process_Sequencing: "process"
Process_Sequencing ||-- |{ SequencingData: "has_output"
WPNode_ReadsQC ||--|| Process_ReadsQC: "process"
Process_ReadsQC ||--|{ SequencingData: "has_input"
Process_ReadsQC ||-- |{ ReadsQCData: "has_output"
WPNode_ReadsQC ||--|| WConfig_ReadsQC: "workflow"
WConfig_ReadsQC ||--o{ WConfig_Assembly: "children workflows"
In this case the Scheduler will "schedule" a new job by creating a Job configuration from:
- the workflow configuration from
node.workflow.children - input data from
node.data_objects
and writing this to the jobs collection in the NMDC database
The Watcher "watches" the jobs table in the NMDC database looking for unclaimed jobs. If found, the Watcher will create a WorkflowJob to manage the analysis job. The watcher will then periodically poll each workflow job for its status and process successful or failed jobs when they are complete
A WorkflowJob consists of a WorkflowStateManager and a JobRunner and is responsible for preparing the
required inputs for an analysis job, submitting it to the job running service.
NMDC's job running service is JAWS.
The JobRunner is also responsible for processing the resulting data and metadata when the job completes.
The watcher maintains a record of it's current activity in a State File
Site-specific configuration is provided by a .toml file and defines some parameters that are used across the workflow process including
- URL and credentials for NMDC API
- Staging and Data filesystem locations for the site
- Job Runner service URLs
- Path to the state file
Workflow Definitions : Workflow definitions in a .yaml file describing each analysis step, specifying:
- Name, type, version, WDL and git repository for each workflow
- Inputs, Outputs and Workflow Execution steps
- Data Object Types, description and name templates for processing workflow output data
The Scheduler is a Dockerized application running on Rancher. To initialize the Scheduler for new DataGeneration IDs, the following steps:
- On Rancher, go to
Deployments, selectProductionfrom the clusters list, and find the Scheduler in eithernmdcornmdc-dev - Click on the Scheduler and select
run shell - In the shell,
cd /conf - Update the file
allow.lstwith the Data Generation IDs that you want to schedule- Copy the list of data-generation IDs to you clipboard
- In the shell, delete the existing allow list
rm allow.lst - Replace the file with your copied list:
cat >allow.lst- Paste your IDs
command-v - Ensure a blank line at the end with a
return - Terminate the
catcommand usingcontrol-d
- The default log level is
INFOif you want to change it toDEBUGfor more verbose logging, run the following command:export NMDC_LOG_LEVEL=DEBUG
- Restart the scheduler. In the shell, in
/conf:./run_scheduler.sh- If running tests on
dev, make sure to check./run_scheduler.sh -hfor options.
- If running tests on
- Ensure the scheduler is running by checking
sched.log
The watcher is a python application which runs on a login node on Perlmutter.
The following instructions all assume the user is logged in as user nmdcda@perlmutter.nersc.gov
- Get an ssh key - in your home directory:
./sshproxy.sh -u <your_nersc_username> -c nmdcda - Log in using the key
ssh -i .ssh/nmdcda nmdcda@perlmutter.nersc.gov
Watcher code and config files can be found
/global/homes/n/nmdcda/nmdc_automation/prod/global/homes/n/nmdcda/nmdc_automation/dev
-
Check the last node the watcher was running on
(base) nmdcda@perlmutter:login07:~> cd nmdc_automation/dev (base) nmdcda@perlmutter:login07:~/nmdc_automation/dev> cat host-dev.last login24
-
ssh to that node
(base) nmdcda@perlmutter:login07:~/nmdc_automation/dev> ssh login24
-
Check for the watcher process
(base) nmdcda@perlmutter:login24:~> ps aux | grep watcher nmdcda 115825 0.0 0.0 8236 848 pts/94 S+ 09:33 0:00 grep watcher nmdcda 2044781 0.4 0.0 146420 113668 ? S Mar06 5:42 python -m nmdc_automation.run_process.run_workflows watcher --config /global/homes/n/nmdcda/nmdc_automation/prod/site_configuration_nersc_prod.toml --jaws daemon nmdcda 2044782 0.0 0.0 5504 744 ? S Mar06 0:00 tee -a watcher-prod.log
-
IF we are going to shut down the Watcher (without restarting), we need to kill the existing process
(base) nmdcda@perlmutter:login24:~> kill -9 2044781
Note
This will also terminate the tee process that is writing to the log file.
To restart the Watcher with older versions of the ./run.sh script, manual termination of the existing process was necessary with kill -9 2044781. However, the new run_watcher.sh script now handles killing and restarting the Watcher.
- Ensure you have the latest
nmdc_automationcode.cd nmdc_automationgit status/git switch mainif not on main branchgit fetch origingit pull
- Setup NMDC automation environment with
condaandpoetry.- load conda:
eval "$__conda_setup" - in the
nmdc_automationdirectory, install the nmdc_automation project withpoetry install eval $(poetry env activate)to use the environment
- load conda:
Example Setup:
(nersc-python) nmdcda@perlmutter:login38:~> pwd
/global/homes/n/nmdcda
(nersc-python) nmdcda@perlmutter:login38:~> cd nmdc_automation/dev/
(nersc-python) nmdcda@perlmutter:login38:~/nmdc_automation/dev> eval "$__conda_setup"
(base) nmdcda@perlmutter:login38:~/nmdc_automation/dev> cd nmdc_automation/
(base) nmdcda@perlmutter:login38:~/nmdc_automation/dev/nmdc_automation> poetry install
Installing dependencies from lock file
No dependencies to install or update
Installing the current project: nmdc-automation (0.1.0)
(base) nmdcda@perlmutter:login06:~/nmdc_automation/dev/nmdc_automation> eval $(poetry env activate)
(nmdc-automation-py3.11) (base) nmdcda@perlmutter:login06:~/nmdc_automation/dev/nmdc_automationThe eval $(poetry env activate) command will activate the environment for the current shell session.
Environment (nmdc-automation-py3.11) will be displayed in the prompt.
We run the Watcher using nohup (No Hangup) - this prevents the Watcher process from being terminated
when the user's terminal session ends. This will cause stdout and stderr to be written to a file
names nohup.out in addition to being written to the watcher-[dev/prod].log file.
- change to the working
prodordevdirectory
/global/homes/n/nmdcda/nmdc_automation/prod/global/homes/n/nmdcda/nmdc_automation/dev
rm nohup.out(Long term logging is captured in thewatcher-[dev/prod].logfile, which is retained)nohup ./run_watcher_dev.sh &(for dev) ORnohup ./run_watcher_prod.sh &(for prod)
Same process as as Checking the Watcher Status
JAWS is a Cromwell-based service that runs jobs on NERSC and other compute resources. Documentation can be found here.
With the jaws_jobid from the agent.state files, you can check the status of the job in the JAWS service
JAWS Status call:
> jaws status 109288
{
"compute_site_id": "nmdc",
"cpu_hours": null,
"cromwell_run_id": "0fddc559-833e-4e14-9fa5-1e3d485b232d",
"id": 109288,
"input_site_id": "nmdc",
"json_file": "/tmp/tmpeoq9a5p_.json",
"output_dir": null,
"result": null,
"status": "running",
"status_detail": "The run is being executed; you can check `tasks` for more detail",
"submitted": "2025-05-01 11:22:45",
"tag": "nmdc:dgns-11-sm8wyy89/nmdc:wfrqc-11-7fgdsy18.1",
"team_id": "nmdc",
"updated": "2025-05-01 11:40:44",
"user_id": "nmdcda",
"wdl_file": "/tmp/tmpq0l3fk0n.wdl",
"workflow_name": "nmdc_rqcfilter",
"workflow_root": "/pscratch/sd/n/nmjaws/nmdc-prod/cromwell-executions/nmdc_rqcfilter/0fddc559-833e-4e14-9fa5-1e3d485b232d"
}-
Query the
jobstable in the NMDC database based onwas_informed_bya specific DataGeneration IDdb.getCollection("jobs").find({ "config.was_informed_by": "nmdc:omprc-11-sdyccb57" })
Similarly, you can query
workflow_executionsto find results based onwas_informed_bya specific DataGeneration IDdb.getCollection("workflow_execution_set").find({ "was_informed_by": "nmdc:omprc-11-sdyccb57" })
-
Job document example
Example database entry
{ "workflow" : { "id" : "Metagenome Assembly: v1.0.9" }, "id" : "nmdc:9380c834-fab7-11ef-b4bd-0a13321f5970", "created_at" : "2025-03-06T18:19:43.000+0000", "config" : { "git_repo" : "https://github.com/microbiomedata/metaAssembly", "release" : "v1.0.9", "wdl" : "jgi_assembly.wdl", "activity_id" : "nmdc:wfmgas-12-k8dxr170.1", "activity_set" : "workflow_execution_set", "was_informed_by" : "nmdc:omprc-11-sdyccb57", "trigger_activity" : "nmdc:wfrqc-12-dvn15085.1", "iteration" : 1, "input_prefix" : "jgi_metaAssembly", "inputs" : { "input_files" : "https://data.microbiomedata.org/data/nmdc:omprc-11-sdyccb57/nmdc:wfrqc-12-dvn15085.1/nmdc_wfrqc-12-dvn15085.1_filtered.fastq.gz", "proj" : "nmdc:wfmgas-12-k8dxr170.1", "shortRead" : false }, "input_data_objects" : [], "activity" : {}, "outputs" : [] }, "claims" : [ ] }
Things to note:
config.was_informed_byis the DataGeneration ID that is the root of this jobconfig.trigger_activityis the WorkflowExecution ID that triggered this jobconfig.inputsare the inputs to the jobclaimsa list of workers that have claimed the job. If this list is empty, the job is available to be claimed. If the list is not empty, the job is being processed by a worker - example:{ "op_id" : "nmdc:sys0z232qf64", "site_id" : "NERSC" }
This refers to the operation and site that is processing the job.
The Watcher maintains a state file with job configuration, metadata and status information. The location of the
state file is defined in the site configuration file. For dev this location is:
/global/cfs/cdirs/m3408/var/dev/agent.state
Example State File Entry
{
"workflow": {
"id": "Metagenome Assembly: v1.0.9"
},
"created_at": "2025-03-06T18:19:43",
"config": {
"git_repo": "https://github.com/microbiomedata/metaAssembly",
"release": "v1.0.9",
"wdl": "jgi_assembly.wdl",
"activity_id": "nmdc:wfmgas-12-k8dxr170.1",
"activity_set": "workflow_execution_set",
"was_informed_by": "nmdc:omprc-11-sdyccb57",
"trigger_activity": "nmdc:wfrqc-12-dvn15085.1",
"iteration": 1,
"input_prefix": "jgi_metaAssembly",
"inputs": {
"input_files": "https://data.microbiomedata.org/data/nmdc:omprc-11-sdyccb57/nmdc:wfrqc-12-dvn15085.1/nmdc_wfrqc-12-dvn15085.1_filtered.fastq.gz",
"proj": "nmdc:wfmgas-12-k8dxr170.1",
"shortRead": false
},
"input_data_objects": [],
"activity": {},
"outputs": []
},
"claims": [],
"opid": "nmdc:sys0z232qf64",
"done": true,
"start": "2025-03-06T19:24:52.176365+00:00",
"jaws_jobid": "0b138671-824d-496a-b681-24fb6cb207b3",
"last_status": "Failed",
"nmdc_jobid": 147004,
"failed_count": 3
}Similar to a jobs record, with these additional things to note:
doneis a boolean indicating if the job is completejaws_jobidis the job ID from JAWS servicelast_statusis the last known status of the job - this is updated by the watcherfailed_countis the number of times the job has failed
By default, the Watcher will retry a failed job 1 additional time via jaws submit.
If the job fails again, the Watcher will mark the job as done and update the status to Failed.
Some things to note:
For jobs that have failed with a transient incomplete data download, these may be resolved by invoking the jaws download $jaws_jobid command
For jobs that may have failed due to system errors and need to be resubmitted, use the API release endpoint to mark a claimed job as failed and have JAWS resubmit the job if the JAWS job itself cannot be resubmitted. This will increase the claims array in the jobs record by 1.