diff --git a/api/src/wfl/api/handlers.clj b/api/src/wfl/api/handlers.clj
index b40751fbe..b277bdc65 100644
--- a/api/src/wfl/api/handlers.clj
+++ b/api/src/wfl/api/handlers.clj
@@ -84,7 +84,7 @@
"Start the workload with UUID in REQUEST."
[request]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
- (let [{uuid :uuid} (get-in request [:parameters :body])]
+ (let [{uuid :uuid} (:body-params request)]
(logr/infof "post-start endpoint called: uuid=%s" uuid)
(let [workload (workloads/load-workload-for-uuid tx uuid)]
(->>
@@ -99,7 +99,7 @@
"Create and start workload described in BODY of REQUEST"
[request]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
- (let [workload-request (get-in request [:parameters :body])]
+ (let [workload-request (:body-params request)]
(logr/info "executing workload-request: " workload-request)
(->>
(gcs/userinfo request)
diff --git a/api/src/wfl/api/spec.clj b/api/src/wfl/api/spec.clj
index 8506d353e..c6281d396 100644
--- a/api/src/wfl/api/spec.clj
+++ b/api/src/wfl/api/spec.clj
@@ -19,7 +19,6 @@
(s/def ::input_bam #(str/ends-with? % ".bam"))
(s/def ::input_cram #(str/ends-with? % ".cram"))
(s/def ::output string?)
-(s/def ::common_inputs map?)
(s/def ::pipeline string?)
(s/def ::project string?)
(s/def ::release string?)
@@ -31,11 +30,11 @@
(s/def ::uuid-query (s/keys :opt-un [::uuid]))
(s/def ::version string?)
(s/def ::wdl string?)
-(s/def ::workflow_options map?)
-(s/def ::workload-request (s/keys :opt-un [::input
- ::items
- ::common_inputs
- ::workflow_options]
+(s/def ::options map?)
+(s/def ::common map?)
+(s/def ::workload-request (s/keys :opt-un [::common
+ ::input
+ ::items]
:req-un [::cromwell
::output
::pipeline
@@ -60,7 +59,7 @@
;; compound
(s/def ::items (s/* ::workload-inputs))
(s/def ::workload-inputs (s/keys :req-un [::inputs]
- :opt-un [::workflow_options]))
+ :opt-un [::options]))
(s/def ::inputs (s/or :aou ::aou-workflow-inputs
:copyfile ::copyfile-workflow-inputs
:wgs ::wgs-workflow-inputs
@@ -69,7 +68,7 @@
(s/def ::workflows (s/* ::workflow))
(s/def ::workflow
(s/keys :req-un [::inputs]
- :opt-un [::status ::updated ::uuid ::workflow_options]))
+ :opt-un [::status ::updated ::uuid ::options]))
;; aou
(s/def ::analysis_version_number integer?)
diff --git a/api/src/wfl/api/workloads.clj b/api/src/wfl/api/workloads.clj
index 3f34317e9..a98ff3a7c 100644
--- a/api/src/wfl/api/workloads.clj
+++ b/api/src/wfl/api/workloads.clj
@@ -1,6 +1,7 @@
(ns wfl.api.workloads
- (:require [wfl.service.postgres :as postgres]
+ (:require [clojure.string :as str]
[wfl.jdbc :as jdbc]
+ [wfl.service.postgres :as postgres]
[wfl.util :as util]))
;; always derive from base :wfl/exception
@@ -110,14 +111,12 @@
[tx workload]
(letfn [(unnilify [m] (into {} (filter second m)))
(split-inputs [m]
- (let [keep [:id :finished :status :updated :uuid :workflow_options]]
- (assoc (select-keys m keep)
- :inputs (unnilify (apply dissoc m keep)))))
- (unpack-options [m]
- (update m :workflow_options #(when % (util/parse-json %))))]
+ (let [keep [:id :finished :status :updated :uuid :options]]
+ (assoc (select-keys m keep) :inputs (apply dissoc m keep))))
+ (load-options [m] (update m :options (fnil util/parse-json "null")))]
(try
(->> (postgres/get-table tx (:items workload))
- (mapv (comp unnilify split-inputs unpack-options))
+ (mapv (comp unnilify split-inputs load-options))
(assoc workload :workflows)
unnilify)
(catch Throwable cause
@@ -125,3 +124,17 @@
{:workload workload} cause))))))
(defoverload load-workload-impl :default default-load-workload-impl)
+
+;; Common workload operations
+(defn saved-before?
+ "Test if the `_workload` was saved before the `reference` version string.
+ Version strings must be in the form \"major.minor.patch\"."
+ [reference {:keys [version] :as _workload}]
+ (letfn [(decode [v] (map util/parse-int (str/split v #"\.")))
+ (validate [v] (when (not= 3 (count v))
+ (throw (ex-info "malformed version string"
+ {:version (str/join "." v)})))
+ v)
+ (lt? [[x & xs] [y & ys]]
+ (or (< x y) (and (== x y) (every? some? [xs ys]) (lt? xs ys))))]
+ (util/on lt? (comp validate decode) version reference)))
diff --git a/api/src/wfl/module/batch.clj b/api/src/wfl/module/batch.clj
index 5582aba22..9cbbd3bb0 100644
--- a/api/src/wfl/module/batch.clj
+++ b/api/src/wfl/module/batch.clj
@@ -2,6 +2,9 @@
"Some utilities shared between batch workloads in cromwell."
(:require [clojure.pprint :refer [pprint]]
[wfl.jdbc :as jdbc]
+ [wfl.module.all :as all]
+ [wfl.service.postgres :as postgres]
+ [wfl.util :as util]
[wfl.wfl :as wfl])
(:import [java.util UUID]))
@@ -12,17 +15,29 @@
[tx {:keys [release top] :as _workflow-wdl} {:keys [pipeline] :as workload-request}]
(let [[{:keys [id]}]
(-> workload-request
- (select-keys [:creator :cromwell :input :output :project])
- (merge (-> (wfl/get-the-version) (select-keys [:commit :version])))
- (assoc :release release
- :wdl top
- :uuid (UUID/randomUUID))
- (->> (jdbc/insert! tx :workload)))
+ (select-keys [:creator :cromwell :input :output :project])
+ (update :cromwell all/de-slashify)
+ (merge (select-keys (wfl/get-the-version) [:commit :version]))
+ (assoc :release release :wdl top :uuid (UUID/randomUUID))
+ (->> (jdbc/insert! tx :workload)))
table (format "%s_%09d" pipeline id)]
(jdbc/execute! tx
- ["UPDATE workload SET pipeline = ?::pipeline WHERE id = ?" pipeline id])
+ ["UPDATE workload SET pipeline = ?::pipeline WHERE id = ?" pipeline id])
(jdbc/db-do-commands tx
- (map #(format "CREATE TABLE %s OF CromwellWorkflow (PRIMARY KEY (id))" %)
- [table]))
+ (map #(format "CREATE TABLE %s OF CromwellWorkflow (PRIMARY KEY (id))" %)
+ [table]))
(jdbc/update! tx :workload {:items table} ["id = ?" id])
[id table]))
+
+(defn load-batch-workload-impl
+ "Use transaction `tx` to load and associate the workflows in the `workload`
+ stored in a CromwellWorkflow table."
+ [tx {:keys [items] :as workload}]
+ (letfn [(unnilify [m] (into {} (filter second m)))
+ (load-inputs [m] (update m :inputs (fnil util/parse-json "null")))
+ (load-options [m] (update m :options (fnil util/parse-json "null")))]
+ (->> (postgres/get-table tx items)
+ (mapv (comp unnilify load-options load-inputs))
+ (assoc workload :workflows)
+ load-options
+ unnilify)))
diff --git a/api/src/wfl/module/copyfile.clj b/api/src/wfl/module/copyfile.clj
index 6fcd4fa7c..b3e713184 100644
--- a/api/src/wfl/module/copyfile.clj
+++ b/api/src/wfl/module/copyfile.clj
@@ -1,9 +1,10 @@
(ns wfl.module.copyfile
"A dummy module for smoke testing wfl/cromwell auth."
(:require [clojure.data.json :as json]
- [wfl.api.workloads :as workloads]
+ [wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
[wfl.module.all :as all]
+ [wfl.module.batch :as batch]
[wfl.service.cromwell :as cromwell]
[wfl.util :as util])
(:import [java.time OffsetDateTime]))
@@ -14,53 +15,57 @@
{:release "copyfile-v1.0"
:top "wdl/copyfile.wdl"})
-(defn- submit-workflow
+(defn ^:private get-cromwell-environment [{:keys [cromwell]}]
+ (let [envs (all/cromwell-environments #{:gotc-dev :gotc-prod} cromwell)]
+ (when (not= 1 (count envs))
+ (throw (ex-info "no unique environment matching Cromwell URL."
+ {:cromwell cromwell
+ :environments envs})))
+ (first envs)))
+
+(defn ^:private submit-workflow
"Submit WORKFLOW to Cromwell in ENVIRONMENT with OPTIONS and LABELS."
- [environment workflow options labels]
+ [environment inputs options labels]
(cromwell/submit-workflow
environment
(util/extract-resource (:top workflow-wdl))
nil
- (-> workflow (select-keys [:src :dst]) (util/prefix-keys pipeline))
+ (-> inputs (util/prefix-keys pipeline))
options
labels))
-(defn add-copyfile-workload!
- "Use transaction TX to add the workload described by WORKLOAD-REQUEST."
- [tx {:keys [items] :as _workload-request}]
- (let [workflow-options (-> (:cromwell _workload-request)
- all/cromwell-environments
- first
- util/make-options
- (util/deep-merge (:workflow_options _workload-request)))
- [id table] (all/add-workload-table! tx workflow-wdl _workload-request)
- to-row (fn [item] (assoc (:inputs item)
- :workflow_options
- (json/write-str (util/deep-merge workflow-options (:workflow_options item)))))]
- (letfn [(add-id [m id] (assoc m :id id))]
- (jdbc/insert-multi! tx table (map add-id (map to-row items) (range))))
- id))
+(defn create-copyfile-workload!
+ "Use transaction TX to add the workload described by REQUEST."
+ [tx {:keys [items common] :as request}]
+ (letfn [(merge-to-json [shared specific]
+ (json/write-str (util/deep-merge shared specific)))
+ (serialize [workflow id]
+ (-> workflow
+ (assoc :id id)
+ (update :inputs #(merge-to-json (:inputs common) %))
+ (update :options #(merge-to-json (:options common) %))))]
+ (let [[id table] (batch/add-workload-table! tx workflow-wdl request)]
+ (jdbc/insert-multi! tx table (map serialize items (range)))
+ (workloads/load-workload-for-id tx id))))
(defn start-copyfile-workload!
"Use transaction TX to start _WORKLOAD."
- [tx {:keys [cromwell items uuid] :as workload}]
- (let [env (first (all/cromwell-environments cromwell))]
- (letfn [(submit! [{:keys [id inputs workflow_options]}]
- [id (submit-workflow env inputs workflow_options {:workload uuid}) "Submitted"])
- (update! [tx [id uuid status]]
+ [tx {:keys [items uuid] :as workload}]
+ (let [env (get-cromwell-environment workload)
+ default-options (util/make-options env)]
+ (letfn [(submit! [{:keys [id inputs options]}]
+ [id (submit-workflow env inputs
+ (util/deep-merge default-options options)
+ {:workload uuid})])
+ (update! [tx [id uuid]]
(jdbc/update! tx items
- {:updated (OffsetDateTime/now) :uuid uuid :status status}
+ {:updated (OffsetDateTime/now) :uuid uuid :status "Submitted"}
["id = ?" id]))]
(run! (comp (partial update! tx) submit!) (:workflows workload))
(jdbc/update! tx :workload
{:started (OffsetDateTime/now)} ["uuid = ?" uuid]))))
-(defmethod workloads/create-workload!
- pipeline
- [tx request]
- (->>
- (add-copyfile-workload! tx request)
- (workloads/load-workload-for-id tx)))
+(defoverload workloads/create-workload! pipeline create-copyfile-workload!)
(defmethod workloads/start-workload!
pipeline
@@ -68,3 +73,10 @@
(do
(start-copyfile-workload! tx workload)
(workloads/load-workload-for-id tx id)))
+
+(defmethod workloads/load-workload-impl
+ pipeline
+ [tx workload]
+ (if (workloads/saved-before? "0.4.0" workload)
+ (workloads/default-load-workload-impl tx workload)
+ (batch/load-batch-workload-impl tx workload)))
diff --git a/api/src/wfl/module/wgs.clj b/api/src/wfl/module/wgs.clj
index 49b4d35b7..abedcce00 100644
--- a/api/src/wfl/module/wgs.clj
+++ b/api/src/wfl/module/wgs.clj
@@ -2,16 +2,13 @@
"Reprocess (External) Whole Genomes."
(:require [clojure.java.io :as io]
[clojure.data.json :as json]
- [clojure.string :as str]
- [clojure.tools.logging.readable :as logr]
- [wfl.api.workloads :as workloads]
+ [wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.environments :as env]
[wfl.jdbc :as jdbc]
[wfl.module.all :as all]
[wfl.references :as references]
[wfl.service.cromwell :as cromwell]
[wfl.service.gcs :as gcs]
- [wfl.service.postgres :as postgres]
[wfl.util :as util]
[wfl.wdl :as wdl]
[wfl.wfl :as wfl]
@@ -35,10 +32,13 @@
(let [[key value] (first cromwell-label-map)]
(str (name key) ":" value)))
-(def get-cromwell-wgs-environment
- "Transduce Cromwell URL to a :wgs environment."
- (comp first (partial all/cromwell-environments
- #{:wgs-dev :wgs-prod :wgs-staging})))
+(defn ^:private get-cromwell-environment [{:keys [cromwell]}]
+ (let [envs (all/cromwell-environments #{:wgs-dev :wgs-prod} cromwell)]
+ (when (not= 1 (count envs))
+ (throw (ex-info "no unique environment matching Cromwell URL."
+ {:cromwell cromwell
+ :environments envs})))
+ (first envs)))
(def cram-ref
"Ref Fasta for CRAM."
@@ -59,13 +59,8 @@
(def hack-task-level-values
"Hack to overload task-level values for wgs pipeline."
(let [hg38 "gs://gcp-public-data--broad-references/hg38/v0/"]
- (merge {:wgs_coverage_interval_list
- (str hg38 "wgs_coverage_regions.hg38.interval_list")}
- (-> {:disable_sanity_check true}
- (util/prefix-keys :CheckContamination)
- (util/prefix-keys :UnmappedBamToAlignedBam)
- (util/prefix-keys :WholeGenomeGermlineSingleSample)
- (util/prefix-keys :WholeGenomeReprocessing)))))
+ {:wgs_coverage_interval_list
+ (str hg38 "wgs_coverage_regions.hg38.interval_list")}))
(defn env-inputs
"Genome inputs for ENVIRONMENT that do not depend on the input file."
@@ -79,121 +74,114 @@
:scatter_settings {:haplotype_scatter_count 10
:break_bands_at_multiples_of 100000}}))
-(defn make-inputs
+(defn ^:private normalize-references [inputs]
+ (update inputs :references
+ #(util/deep-merge
+ (-> inputs :reference_fasta_prefix make-references)
+ %)))
+
+(defn ^:private make-inputs-to-save
+ "Return inputs for reprocessing IN-GS into OUT-GS."
+ [out-gs inputs]
+ (let [sample (some inputs [:input_bam :input_cram])
+ [_ base _] (all/bam-or-cram? sample)
+ leaf (util/basename base)
+ [_ out-dir] (gcs/parse-gs-url (util/unsuffix base leaf))]
+ (-> inputs
+ (util/assoc-when util/absent? :base_file_name leaf)
+ (util/assoc-when util/absent? :sample_name leaf)
+ (util/assoc-when util/absent? :unmapped_bam_suffix ".unmapped.bam")
+ (util/assoc-when util/absent? :final_gvcf_base_name leaf)
+ (assoc :destination_cloud_path (str out-gs out-dir)))))
+
+(defn ^:private make-cromwell-inputs
"Return inputs for reprocessing IN-GS into OUT-GS in ENVIRONMENT."
- [environment out-gs in-gs sample]
- (let [[input-key base _] (all/bam-or-cram? in-gs)
- leaf (last (str/split base #"/"))
- [_ out-dir] (gcs/parse-gs-url (util/unsuffix base leaf))
- ref-prefix (get sample :reference_fasta_prefix)
- final_gvcf_base_name (or (:final_gvcf_base_name sample) leaf)
- inputs (-> {}
- (assoc :base_file_name (or (:base_file_name sample) leaf))
- (assoc :sample_name (or (:sample_name sample) leaf))
- (assoc :unmapped_bam_suffix (or (:unmapped_bam_suffix sample) ".unmapped.bam"))
- (assoc :final_gvcf_base_name final_gvcf_base_name)
- (assoc input-key in-gs)
- (assoc :destination_cloud_path (str out-gs out-dir))
- (assoc :references (make-references ref-prefix))
- (merge cram-ref)
- (merge (env-inputs environment)
- hack-task-level-values))
- output (str (:destination_cloud_path inputs)
- final_gvcf_base_name
- ".cram")]
- (all/throw-when-output-exists-already! output)
- (util/prefix-keys inputs :ExternalWholeGenomeReprocessing)))
+ [environment workflow-inputs]
+ (-> (util/deep-merge cram-ref hack-task-level-values)
+ (util/deep-merge (env-inputs environment))
+ (util/deep-merge workflow-inputs)
+ (util/prefix-keys (keyword pipeline))))
(defn make-labels
"Return labels for wgs pipeline from OTHER-LABELS."
[other-labels]
- (merge cromwell-label-map
- other-labels))
+ (merge cromwell-label-map other-labels))
(defn really-submit-one-workflow
"Submit IN-GS for reprocessing into OUT-GS in ENVIRONMENT given OTHER-LABELS."
- [environment in-gs out-gs sample options other-labels]
+ [environment inputs options other-labels]
(let [path (wdl/hack-unpack-resources-hack (:top workflow-wdl))]
- (logr/infof "submitting workflow with: in-gs: %s, out-gs: %s" in-gs out-gs)
(cromwell/submit-workflow
environment
(io/file (:dir path) (path ".wdl"))
(io/file (:dir path) (path ".zip"))
- (make-inputs environment out-gs in-gs sample)
+ (make-cromwell-inputs environment inputs)
options
(make-labels other-labels))))
-(defn add-wgs-workload!
- "Use transaction TX to add the workload described by WORKLOAD-REQUEST."
- [tx {:keys [items] :as workload-request}]
- (let [workflow-options (-> (:cromwell workload-request)
- all/de-slashify
- get-cromwell-wgs-environment
- util/make-options
- (util/deep-merge (:workflow_options workload-request)))
- [id table] (batch/add-workload-table! tx workflow-wdl workload-request)]
- (letfn [(form [m id] (-> m
- (update :inputs json/write-str)
- (update :workflow_options #(json/write-str (util/deep-merge workflow-options %)))
- (assoc :id id)))]
- (jdbc/insert-multi! tx table (map form items (range)))
- id)))
+(defn create-wgs-workload!
+ "Use transaction TX to add the workload described by REQUEST."
+ [tx {:keys [items output common] :as request}]
+ (letfn [(serialize [workflow id]
+ (-> (assoc workflow :id id)
+ (update :options
+ #(json/write-str (util/deep-merge (:options common) %)))
+ (update :inputs
+ #(json/write-str
+ (normalize-references
+ (util/deep-merge
+ (:inputs common)
+ (make-inputs-to-save output %)))))))]
+ (let [[id table] (batch/add-workload-table! tx workflow-wdl request)]
+ (jdbc/insert-multi! tx table (map serialize items (range)))
+ (workloads/load-workload-for-id tx id))))
(defn skip-workflow?
"True when _WORKFLOW in _WORKLOAD in ENV is done or active."
[env
- {:keys [input output] :as _workload}
- {:keys [input_cram] :as _workflow}]
- (let [in-gs (str (all/slashify input) input_cram)
- out-gs (str (all/slashify output) input_cram)]
- (or (->> out-gs gcs/parse-gs-url
- (apply gcs/list-objects)
- util/do-or-nil seq) ; done?
- (->> {:label cromwell-label
- :status ["On Hold" "Running" "Submitted"]}
- (cromwell/query env)
- (map (comp :ExternalWholeGenomeReprocessing.input_cram
- (fn [it] (json/read-str it :key-fn keyword))
- :inputs :submittedFiles
- (partial cromwell/metadata env)
- :id))
- (keep #{in-gs})
- seq)))) ; active?
+ {:keys [output] :as _workload}
+ {:keys [inputs] :as _workflow}]
+ (letfn [(exists? [out-gs] (->> (gcs/parse-gs-url out-gs)
+ (apply gcs/list-objects)
+ util/do-or-nil
+ seq))
+ (processing? [in-gs]
+ (->> {:label cromwell-label :status cromwell/active-statuses}
+ (cromwell/query :gotc-dev)
+ (filter #(= pipeline (:name %)))
+ (map #(->> % :id (cromwell/metadata env) :inputs))
+ (map #(some % [:input_bam :input_cram]))
+ (filter #{in-gs})
+ seq))]
+ (let [in-gs (some inputs [:input_bam :input_cram])
+ [_ object] (gcs/parse-gs-url in-gs)
+ out-gs (str (all/slashify output) object)]
+ (or (exists? out-gs) (processing? in-gs)))))
(defn start-wgs-workload!
"Use transaction TX to start the WORKLOAD."
- [tx {:keys [cromwell input items output uuid] :as workload}]
- (let [env (get-cromwell-wgs-environment (all/de-slashify cromwell))
- input (all/slashify input)
- output (all/slashify output)
- now (OffsetDateTime/now)
+ [tx {:keys [items uuid] :as workload}]
+ (let [env (get-cromwell-environment workload)
+ default-options (util/make-options env)
workload->label {:workload uuid}]
- (letfn [(submit! [{:keys [id inputs uuid workflow_options] :as workflow}]
- [id (or uuid
- (if (skip-workflow? env workload workflow)
- util/uuid-nil
- (really-submit-one-workflow
- env
- (str input (some inputs [:input_bam :input_cram]))
- output
- inputs
- workflow_options
- workload->label)))])
- (update! [tx [id uuid]]
- (when uuid
- (jdbc/update! tx items
- {:updated now :uuid uuid}
- ["id = ?" id])))]
- (let [ids-uuids (map submit! (:workflows workload))]
- (run! (partial update! tx) ids-uuids)
+ (letfn [(submit! [{:keys [id inputs options] :as workflow}]
+ (if (skip-workflow? env workload workflow)
+ [id "skipped" util/uuid-nil]
+ [id "Submitted"
+ (really-submit-one-workflow
+ env
+ inputs
+ (util/deep-merge default-options options)
+ workload->label)]))
+ (update! [tx [id status uuid]]
+ (jdbc/update! tx items
+ {:status status :updated (OffsetDateTime/now) :uuid uuid}
+ ["id = ?" id]))]
+ (let [now (OffsetDateTime/now)]
+ (run! (comp (partial update! tx) submit!) (:workflows workload))
(jdbc/update! tx :workload {:started now} ["uuid = ?" uuid])))))
-(defmethod workloads/create-workload!
- pipeline
- [tx request]
- (->>
- (add-wgs-workload! tx request)
- (workloads/load-workload-for-id tx)))
+(defoverload workloads/create-workload! pipeline create-wgs-workload!)
(defmethod workloads/start-workload!
pipeline
@@ -202,23 +190,9 @@
(start-wgs-workload! tx workload)
(workloads/load-workload-for-id tx id)))
-;; visible for testing
-(defn uses-cromwell-workload-table? [{:keys [version]}]
- (let [[major minor & _] (mapv util/parse-int (str/split version #"\."))]
- (or (< 0 major) (< 3 minor))))
-
(defmethod workloads/load-workload-impl
pipeline
- [tx {:keys [items] :as workload}]
- (if-not (uses-cromwell-workload-table? workload)
+ [tx workload]
+ (if (workloads/saved-before? "0.4.0" workload)
(workloads/default-load-workload-impl tx workload)
- (letfn [(unnilify [m] (into {} (filter second m)))
- (unpack-options [m]
- (update m :workflow_options #(when % (util/parse-json %))))]
- (->> (postgres/get-table tx items)
- (mapv (comp unpack-options
- #(update % :inputs util/parse-json)
- unnilify))
- (assoc workload :workflows)
- unpack-options
- unnilify))))
+ (batch/load-batch-workload-impl tx workload)))
diff --git a/api/src/wfl/module/xx.clj b/api/src/wfl/module/xx.clj
index 2650bbbd7..5fffcaf47 100644
--- a/api/src/wfl/module/xx.clj
+++ b/api/src/wfl/module/xx.clj
@@ -3,7 +3,6 @@
(:require [clojure.data.json :as json]
[clojure.java.io :as io]
[clojure.string :as str]
- [clojure.tools.logging.readable :as logr]
[wfl.api.workloads :refer [defoverload]]
[wfl.api.workloads :as workloads]
[wfl.environments :as env]
@@ -36,7 +35,7 @@
hsa "Homo_sapiens_assembly38"]
(merge references/hg38-exome-references
references/contamination-sites
- {:calling_interval_list (str hg38 "exome_calling_regions.v1.interval_list")
+ {:calling_interval_list (str hg38 "exome_calling_regions.v1.interval_list")
:haplotype_database_file (str hg38 hsa ".haplotype_database.txt")})))
(def ^:private workflow-defaults
@@ -52,7 +51,7 @@
:bait_interval_list (str hg38 iv1 ".baits.interval_list")
:target_interval_list (str hg38 iv1 ".targets.interval_list")
:references references-defaults
- :scatter_settings {:break_bands_at_multiples_of 0
+ :scatter_settings {:break_bands_at_multiples_of 0
:haplotype_scatter_count 50}
:papi_settings {:agg_preemptible_tries 3
:preemptible_tries 3}}))
@@ -67,20 +66,18 @@
:environments envs})))
(first envs)))
-(defn ^:private cromwellify-inputs [environment inputs]
+(defn ^:private cromwellify-workflow-inputs [environment {:keys [inputs]}]
(-> (env/stuff environment)
(select-keys [:google_account_vault_path :vault_token_path])
(merge inputs)
(util/prefix-keys (keyword pipeline))))
;; visible for testing
-;; Note: the database stores per-workflow inputs so we need to combine
-;; any `common-inputs` with these before we commit them to storage.
-(defn make-combined-inputs-to-save [output-url common-inputs inputs]
+(defn make-inputs-to-save [output-url inputs]
(let [sample-name (fn [basename] (first (str/split basename #"\.")))
[_ path] (gcs/parse-gs-url (some inputs [:input_bam :input_cram]))
basename (or (:base_file_name inputs) (util/basename path))]
- (-> (util/deep-merge common-inputs inputs)
+ (-> inputs
(assoc :base_file_name basename)
(util/assoc-when util/absent? :sample_name (sample-name basename))
(util/assoc-when util/absent? :final_gvcf_base_name basename)
@@ -89,62 +86,61 @@
;; visible for testing
(defn submit-workload! [{:keys [uuid workflows] :as workload}]
- (let [path (wdl/hack-unpack-resources-hack (:top workflow-wdl))
- environment (get-cromwell-environment workload)
- ;; Batch calls have uniform options, so we must group by discrete options to submit
- workflows-by-options (seq (group-by :workflow_options workflows))]
+ (let [path (wdl/hack-unpack-resources-hack (:top workflow-wdl))
+ environment (get-cromwell-environment workload)
+ default-options (util/make-options environment)]
(letfn [(update-workflow [workflow cromwell-uuid]
(assoc workflow :uuid cromwell-uuid
:status "Submitted"
:updated (OffsetDateTime/now)))
- (submit-workflows-by-options [[options ws]]
- (mapv update-workflow
- ws
- (cromwell/submit-workflows
- environment
- (io/file (:dir path) (path ".wdl"))
- (io/file (:dir path) (path ".zip"))
- (map (comp (partial cromwellify-inputs environment) :inputs) ws)
- options
- (merge cromwell-labels {:workload uuid}))))]
- (apply concat (mapv submit-workflows-by-options workflows-by-options)))))
+ (submit-batch! [[options workflows]]
+ (map update-workflow
+ workflows
+ (cromwell/submit-workflows
+ environment
+ (io/file (:dir path) (path ".wdl"))
+ (io/file (:dir path) (path ".zip"))
+ (map (partial cromwellify-workflow-inputs environment) workflows)
+ (util/deep-merge default-options options)
+ (merge cromwell-labels {:workload uuid}))))]
+ ;; Group by discrete options to batch submit
+ (mapcat submit-batch! (group-by :options workflows)))))
-(defn create-xx-workload! [tx {:keys [output common_inputs items] :as request}]
- (letfn [(make-workflow-record [workload-options id item]
- (let [options-string (json/write-str (util/deep-merge workload-options (:workflow_options item)))]
- (->> (make-combined-inputs-to-save output common_inputs (:inputs item))
- json/write-str
- (assoc {:id id :workflow_options options-string} :inputs))))]
- (let [workflow-options (util/deep-merge (util/make-options (get-cromwell-environment request))
- (:workflow_options request))
- [id table] (batch/add-workload-table! tx workflow-wdl request)]
- (->> (map (partial make-workflow-record workflow-options) (range) items)
- (jdbc/insert-multi! tx table))
+(defn create-xx-workload!
+ [tx {:keys [common items output] :as request}]
+ (letfn [(merge-to-json [shared specific]
+ (json/write-str (util/deep-merge shared specific)))
+ (serialize [item id]
+ (-> item
+ (assoc :id id)
+ (update :options #(merge-to-json (:options common) %))
+ (update :inputs #(merge-to-json (:inputs common)
+ (make-inputs-to-save output %)))))]
+ (let [[id table] (batch/add-workload-table! tx workflow-wdl request)]
+ (jdbc/insert-multi! tx table (map serialize items (range)))
(workloads/load-workload-for-id tx id))))
(defn start-xx-workload! [tx {:keys [items id] :as workload}]
- (if (:started workload)
- workload
- (letfn [(update-record! [{:keys [id] :as workflow}]
- (let [values (select-keys workflow [:uuid :status :updated])]
- (jdbc/update! tx items values ["id = ?" id])))]
- (let [now (OffsetDateTime/now)]
- (run! update-record! (submit-workload! workload))
- (jdbc/update! tx :workload {:started now} ["id = ?" id]))
- (workloads/load-workload-for-id tx id))))
+ (letfn [(update-record! [{:keys [id] :as workflow}]
+ (let [values (select-keys workflow [:uuid :status :updated])]
+ (jdbc/update! tx items values ["id = ?" id])))]
+ (let [now (OffsetDateTime/now)]
+ (run! update-record! (submit-workload! workload))
+ (jdbc/update! tx :workload {:started now} ["id = ?" id]))
+ (workloads/load-workload-for-id tx id)))
(defmethod workloads/load-workload-impl
pipeline
[tx {:keys [items] :as workload}]
(letfn [(unnilify [m] (into {} (filter second m)))
- (load-inputs! [workflow]
- (util/deep-merge workflow-defaults (util/parse-json workflow)))
- (unpack-options [m]
- (update m :workflow_options #(util/parse-json (or % "{}"))))]
+ (load-inputs [m]
+ (update m :inputs
+ #(util/deep-merge workflow-defaults (util/parse-json %))))
+ (load-options [m] (update m :options (fnil util/parse-json "null")))]
(->> (postgres/get-table tx items)
- (mapv (comp #(update % :inputs load-inputs!) unnilify unpack-options))
- (assoc workload :workflows)
- unnilify)))
+ (mapv (comp unnilify load-inputs load-options))
+ (assoc workload :workflows)
+ unnilify)))
(defoverload workloads/create-workload! pipeline create-xx-workload!)
(defoverload workloads/start-workload! pipeline start-xx-workload!)
diff --git a/api/test/wfl/integration/modules/copyfile_test.clj b/api/test/wfl/integration/modules/copyfile_test.clj
index d4f49a6b3..9accc6314 100644
--- a/api/test/wfl/integration/modules/copyfile_test.clj
+++ b/api/test/wfl/integration/modules/copyfile_test.clj
@@ -1,11 +1,14 @@
(ns wfl.integration.modules.copyfile-test
(:require [clojure.test :refer [deftest testing is] :as clj-test]
+ [clojure.string :as str]
+ [wfl.jdbc :as jdbc]
+ [wfl.module.all :as all]
+ [wfl.module.copyfile :as copyfile]
[wfl.service.cromwell :refer [wait-for-workflow-complete submit-workflow]]
[wfl.tools.endpoints :as endpoints]
[wfl.tools.fixtures :as fixtures]
[wfl.tools.workloads :as workloads]
- [wfl.util :as util]
- [wfl.module.all :as all])
+ [wfl.util :as util])
(:import (java.util UUID)))
(clj-test/use-fixtures :once fixtures/temporary-postgresql-database)
@@ -13,46 +16,67 @@
(defn ^:private make-copyfile-workload-request
[src dst]
(-> (workloads/copyfile-workload-request src dst)
- (assoc :creator (:email @endpoints/userinfo))))
+ (assoc :creator (:email @endpoints/userinfo))))
+
+(defn ^:private old-create-wgs-workload! []
+ (let [request (make-copyfile-workload-request "gs://fake/input" "gs://fake/output")]
+ (jdbc/with-db-transaction [tx (fixtures/testing-db-config)]
+ (let [[id table] (all/add-workload-table! tx copyfile/workflow-wdl request)
+ add-id (fn [m id] (assoc (:inputs m) :id id))]
+ (jdbc/insert-multi! tx table (map add-id (:items request) (range)))
+ (jdbc/update! tx :workload {:version "0.3.8"} ["id = ?" id])
+ id))))
+
+(deftest test-loading-old-copyfile-workload
+ (let [id (old-create-wgs-workload!)
+ workload (workloads/load-workload-for-id id)]
+ (is (= id (:id workload)))
+ (is (= copyfile/pipeline (:pipeline workload)))))
(deftest test-workflow-options
- (fixtures/with-temporary-gcs-folder
- uri
- (let [src (str uri "input.txt")
- dst (str uri "output.txt")
- option-sequence [:a :a :b]
- workload-request (-> (make-copyfile-workload-request src dst)
- (update :items (fn [existing]
- (mapv #(assoc %1 :workflow_options {%2 "some value"})
- (repeat (first existing)) option-sequence)))
- (assoc :workflow_options {:c "some other value"}))
- submitted-option-counts (atom {})
- ;; Mock cromwell/submit-workflow, count observed option keys per workflow
- pretend-submit (fn [_ _ _ _ options _]
- (run! #(swap! submitted-option-counts update % (fnil inc 0))
- (keys options))
- (str (UUID/randomUUID)))]
- (with-redefs-fn {#'submit-workflow pretend-submit}
- #(-> workload-request
- workloads/execute-workload!
- (as-> workload
- (testing "Options in server response"
- (is (= (count option-sequence)
- (count (filter (fn [w] (get-in w [:workflow_options :c])) (:workflows workload)))))
- (is (= (count (filter (partial = :a) option-sequence))
- (count (filter (fn [w] (get-in w [:workflow_options :a])) (:workflows workload)))))
- (is (= (count (filter (partial = :b) option-sequence))
- (count (filter (fn [w] (get-in w [:workflow_options :b])) (:workflows workload)))))
- (is (workloads/baseline-options-across-workload
- (-> (:cromwell workload)
- all/cromwell-environments
- first
- util/make-options)
- workload))))))
- (testing "Options sent to Cromwell"
- (is (= (count option-sequence)
- (:c @submitted-option-counts)))
- (is (= (count (filter (partial = :a) option-sequence))
- (:a @submitted-option-counts)))
- (is (= (count (filter (partial = :b) option-sequence))
- (:b @submitted-option-counts)))))))
+ (letfn [(verify-workflow-options [options]
+ (is (:supports_common_options options))
+ (is (:supports_options options))
+ (is (:overwritten options)))
+ (verify-submitted-options [env _ _ _ options _]
+ (let [defaults (util/make-options env)]
+ (verify-workflow-options options)
+ (is (= defaults (select-keys options (keys defaults))))
+ (UUID/randomUUID)))]
+ (with-redefs-fn {#'submit-workflow verify-submitted-options}
+ (fn []
+ (->
+ (make-copyfile-workload-request "gs://fake/input" "gs://fake/output")
+ (assoc-in [:common :options]
+ {:supports_common_options true :overwritten false})
+ (update :items
+ (partial map
+ #(assoc % :options {:supports_options true :overwritten true})))
+ workloads/execute-workload!
+ :workflows
+ (->> (map (comp verify-workflow-options :options))))))))
+
+(deftest test-submitted-workflow-inputs
+ (letfn [(prefixed? [prefix key] (str/starts-with? (str key) (str prefix)))
+ (strip-prefix [[k v]]
+ [(keyword (util/unprefix (str k) ":copyfile."))
+ v])
+ (verify-workflow-inputs [inputs]
+ (is (:supports_common_inputs inputs))
+ (is (:supports_inputs inputs))
+ (is (:overwritten inputs)))
+ (verify-submitted-inputs [_ _ _ inputs _ _]
+ (is (every? #(prefixed? :copyfile %) (keys inputs)))
+ (verify-workflow-inputs (into {} (map strip-prefix inputs)))
+ (UUID/randomUUID))]
+ (with-redefs-fn {#'submit-workflow verify-submitted-inputs}
+ (fn []
+ (->
+ (make-copyfile-workload-request "gs://fake/foo" "gs://fake/bar")
+ (assoc-in [:common :inputs]
+ {:supports_common_inputs true :overwritten false})
+ (update :items
+ (partial map
+ #(update % :inputs
+ (fn [xs] (merge xs {:supports_inputs true :overwritten true})))))
+ workloads/execute-workload!)))))
diff --git a/api/test/wfl/integration/modules/wgs_test.clj b/api/test/wfl/integration/modules/wgs_test.clj
index 099c3e1e1..adf550f0e 100644
--- a/api/test/wfl/integration/modules/wgs_test.clj
+++ b/api/test/wfl/integration/modules/wgs_test.clj
@@ -8,9 +8,9 @@
[wfl.module.wgs :as wgs]
[wfl.jdbc :as jdbc]
[wfl.module.all :as all]
- [clojure.string :as str]
[wfl.util :as util]
- [clojure.data.json :as json])
+ [wfl.references :as references]
+ [clojure.string :as str])
(:import (java.util UUID)))
(clj-test/use-fixtures :once fixtures/temporary-postgresql-database)
@@ -23,15 +23,40 @@
workloads/wgs-workload-request
(assoc :creator (:email @endpoints/userinfo))))
+(deftest test-create-with-common-reference-fasta-prefix
+ (let [prefix "gs://fake-input-bucket/ref-fasta"]
+ (letfn [(verify-reference-fasta [reference-fasta]
+ (is (= reference-fasta (references/reference_fasta prefix))))
+ (go! [inputs]
+ (verify-reference-fasta
+ (get-in inputs [:references :reference_fasta])))]
+ (run! (comp go! :inputs)
+ (-> (make-wgs-workload-request)
+ (assoc-in [:common :inputs] {:reference_fasta_prefix prefix})
+ workloads/create-workload!
+ :workflows)))))
+
+(deftest test-create-with-reference-fasta-prefix-override
+ (let [prefix "gs://fake-input-bucket/ref-fasta"]
+ (letfn [(verify-reference-fasta [reference-fasta]
+ (is (= reference-fasta (references/reference_fasta prefix))))
+ (go! [inputs]
+ (verify-reference-fasta
+ (get-in inputs [:references :reference_fasta])))]
+ (run! (comp go! :inputs)
+ (-> (make-wgs-workload-request)
+ (assoc-in [:common :inputs] {:reference_fasta_prefix "gs://ignore/this/ref-fasta"})
+ (update :items (partial map #(update % :inputs (fn [xs] (assoc xs :reference_fasta_prefix prefix)))))
+ workloads/create-workload!
+ :workflows)))))
+
(deftest test-start-wgs-workload!
(with-redefs-fn {#'wgs/really-submit-one-workflow mock-really-submit-one-workflow}
#(let [workload (-> (make-wgs-workload-request)
workloads/create-workload!
workloads/start-workload!)]
(letfn [(check-nesting [workflow]
- (is
- (= (:inputs workflow) workloads/wgs-inputs)
- "Inputs are under :inputs")
+ (is (:inputs workflow) "Inputs are under :inputs")
(is
(not-any? (partial contains? workflow) (keys workloads/wgs-inputs))
"Inputs are not at the top-level"))]
@@ -47,11 +72,10 @@
id))))
(deftest test-loading-old-wgs-workload
- (testing "loading a wgs workload saved in a previous release"
- (let [id (old-create-wgs-workload!)
- workload (workloads/load-workload-for-id id)]
- (is (= id (:id workload)))
- (is (= wgs/pipeline (:pipeline workload))))))
+ (let [id (old-create-wgs-workload!)
+ workload (workloads/load-workload-for-id id)]
+ (is (= id (:id workload)))
+ (is (= wgs/pipeline (:pipeline workload)))))
(deftest test-exec-with-input_bam
(letfn [(go! [workflow]
@@ -63,12 +87,11 @@
#(-> %
(dissoc :input_cram)
(assoc :input_bam "gs://inputs/fake.bam"))))
- (verify-use_input_bam! [env in out inputs options labels]
- (is (str/ends-with? in ".bam"))
+ (verify-use_input_bam! [env inputs options labels]
(is (contains? inputs :input_bam))
(is (util/absent? inputs :input_cram))
(is (contains? labels :workload))
- [env in out inputs options labels])]
+ [env inputs options labels])]
(with-redefs-fn
{#'wgs/really-submit-one-workflow
(comp mock-really-submit-one-workflow verify-use_input_bam!)}
@@ -79,37 +102,50 @@
(is (:started workload))
(run! go! (:workflows workload)))))))
+(deftest test-submitted-workflow-inputs
+ (letfn [(prefixed? [prefix key] (str/starts-with? (str key) (str prefix)))
+ (strip-prefix [[k v]]
+ [(keyword (util/unprefix (str k) ":ExternalWholeGenomeReprocessing."))
+ v])
+ (verify-workflow-inputs [inputs]
+ (is (:supports_common_inputs inputs))
+ (is (:supports_inputs inputs))
+ (is (:overwritten inputs)))
+ (verify-submitted-inputs [_ _ _ inputs _ _]
+ (is (every? #(prefixed? :ExternalWholeGenomeReprocessing %) (keys inputs)))
+ (verify-workflow-inputs (into {} (map strip-prefix inputs)))
+ (UUID/randomUUID))]
+ (with-redefs-fn {#'submit-workflow verify-submitted-inputs}
+ (fn []
+ (->
+ (make-wgs-workload-request)
+ (assoc-in [:common :inputs]
+ {:supports_common_inputs true :overwritten false})
+ (update :items
+ (partial map
+ #(update % :inputs
+ (fn [xs] (merge xs {:supports_inputs true :overwritten true})))))
+ workloads/execute-workload!)))))
+
(deftest test-workflow-options
- (let [option-sequence [:a :a :b]
- workload-request (-> (make-wgs-workload-request)
- (update :items (fn [existing]
- (mapv #(assoc %1 :workflow_options {%2 "some value"})
- (repeat (first existing)) option-sequence)))
- (assoc :workflow_options {:c "some other value"}))
- submitted-option-counts (atom {})
- ;; Mock cromwell/submit-workflow, count observed option keys per workflow
- pretend-submit (fn [_ _ _ _ options _]
- (run! #(swap! submitted-option-counts update % (fnil inc 0))
- (keys options))
- (str (UUID/randomUUID)))]
- (with-redefs-fn {#'submit-workflow pretend-submit}
- #(-> workload-request
- workloads/execute-workload!
- (as-> workload
- (testing "Options in server response"
- (is (= (count option-sequence)
- (count (filter (fn [w] (get-in w [:workflow_options :c])) (:workflows workload)))))
- (is (= (count (filter (partial = :a) option-sequence))
- (count (filter (fn [w] (get-in w [:workflow_options :a])) (:workflows workload)))))
- (is (= (count (filter (partial = :b) option-sequence))
- (count (filter (fn [w] (get-in w [:workflow_options :b])) (:workflows workload)))))
- (is (workloads/baseline-options-across-workload
- (util/make-options (wgs/get-cromwell-wgs-environment (:cromwell workload)))
- workload))))))
- (testing "Options sent to Cromwell"
- (is (= (count option-sequence)
- (:c @submitted-option-counts)))
- (is (= (count (filter (partial = :a) option-sequence))
- (:a @submitted-option-counts)))
- (is (= (count (filter (partial = :b) option-sequence))
- (:b @submitted-option-counts))))))
+ (letfn [(verify-workflow-options [options]
+ (is (:supports_common_options options))
+ (is (:supports_options options))
+ (is (:overwritten options)))
+ (verify-submitted-options [env _ _ _ options _]
+ (let [defaults (util/make-options env)]
+ (verify-workflow-options options)
+ (is (= defaults (select-keys options (keys defaults))))
+ (UUID/randomUUID)))]
+ (with-redefs-fn {#'submit-workflow verify-submitted-options}
+ (fn []
+ (->
+ (make-wgs-workload-request)
+ (assoc-in [:common :options]
+ {:supports_common_options true :overwritten false})
+ (update :items
+ (partial map
+ #(assoc % :options {:supports_options true :overwritten true})))
+ workloads/execute-workload!
+ :workflows
+ (->> (map (comp verify-workflow-options :options))))))))
diff --git a/api/test/wfl/integration/modules/xx_test.clj b/api/test/wfl/integration/modules/xx_test.clj
index 458ba8c85..c2d640db7 100644
--- a/api/test/wfl/integration/modules/xx_test.clj
+++ b/api/test/wfl/integration/modules/xx_test.clj
@@ -1,14 +1,15 @@
(ns wfl.integration.modules.xx-test
(:require [clojure.test :refer [deftest testing is] :as clj-test]
+ [clojure.string :as str]
+ [wfl.environments :as env]
+ [wfl.module.xx :as xx]
[wfl.service.cromwell :refer [wait-for-workflow-complete submit-workflows]]
[wfl.tools.endpoints :as endpoints]
[wfl.tools.fixtures :as fixtures]
[wfl.tools.workloads :as workloads]
- [wfl.module.xx :as xx]
- [wfl.util :refer [absent? on make-options]]
- [wfl.environments :as env])
- (:import (java.util UUID)
- (java.time OffsetDateTime)))
+ [wfl.util :as util :refer [absent? make-options]])
+ (:import (java.time OffsetDateTime)
+ (java.util UUID)))
(clj-test/use-fixtures :once fixtures/temporary-postgresql-database)
@@ -42,11 +43,11 @@
(let [common-inputs {:bait_set_name "Geoff"
:bait_interval_list "gs://fake-input-bucket/interval-list"}]
(letfn [(go! [inputs]
- (letfn [(value-equal? [key] (partial on = key common-inputs inputs))]
+ (letfn [(value-equal? [key] (= (key common-inputs) (key inputs)))]
(is (value-equal? :bait_set_name))
(is (value-equal? :bait_interval_list))))]
(run! (comp go! :inputs) (-> (make-xx-workload-request)
- (assoc :common_inputs common-inputs)
+ (assoc-in [:common :inputs] common-inputs)
workloads/create-workload!
:workflows)))))
@@ -83,37 +84,53 @@
workloads/update-workload!)]
(is (:finished workload))))
+(deftest test-submitted-workflow-inputs
+ (letfn [(prefixed? [prefix key] (str/starts-with? (str key) (str prefix)))
+ (strip-prefix [[k v]]
+ [(keyword (util/unprefix (str k) ":ExternalExomeReprocessing."))
+ v])
+ (verify-workflow-inputs [inputs]
+ (is (:supports_common_inputs inputs))
+ (is (:supports_inputs inputs))
+ (is (:overwritten inputs)))
+ (verify-submitted-inputs [_ _ _ inputs _ _]
+ (map
+ (fn [in]
+ (is (every? #(prefixed? :ExternalExomeReprocessing %) (keys in)))
+ (verify-workflow-inputs (into {} (map strip-prefix in)))
+ (UUID/randomUUID))
+ inputs))]
+ (with-redefs-fn {#'submit-workflows verify-submitted-inputs}
+ (fn []
+ (->
+ (make-xx-workload-request)
+ (assoc-in [:common :inputs]
+ {:supports_common_inputs true :overwritten false})
+ (update :items
+ (partial map
+ #(update % :inputs
+ (fn [xs] (merge xs {:supports_inputs true :overwritten true})))))
+ workloads/execute-workload!)))))
+
(deftest test-workflow-options
- (let [option-sequence [:a :a :b]
- workload-request (-> (make-xx-workload-request)
- (update :items (fn [existing]
- (mapv #(assoc %1 :workflow_options {%2 "some value"})
- (repeat (first existing)) option-sequence)))
- (assoc :workflow_options {:c "some other value"}))
- submitted-option-counts (atom {})
- ;; Mock cromwell/submit-workflows (note the plural), count observed option keys per workflow
- pretend-submit (fn [_ _ _ inputs options _]
- (run! #(swap! submitted-option-counts update % (fnil (partial + (count inputs)) 0))
- (keys options))
- (repeatedly (count inputs) #(str (UUID/randomUUID))))]
- (with-redefs-fn {#'submit-workflows pretend-submit}
- #(-> workload-request
- workloads/execute-workload!
- (as-> workload
- (testing "Options in server response"
- (is (= (count option-sequence)
- (count (filter (fn [w] (get-in w [:workflow_options :c])) (:workflows workload)))))
- (is (= (count (filter (partial = :a) option-sequence))
- (count (filter (fn [w] (get-in w [:workflow_options :a])) (:workflows workload)))))
- (is (= (count (filter (partial = :b) option-sequence))
- (count (filter (fn [w] (get-in w [:workflow_options :b])) (:workflows workload)))))
- (is (workloads/baseline-options-across-workload
- (make-options (xx/get-cromwell-environment workload))
- workload))))))
- (testing "Options sent to Cromwell"
- (is (= (count option-sequence)
- (:c @submitted-option-counts)))
- (is (= (count (filter (partial = :a) option-sequence))
- (:a @submitted-option-counts)))
- (is (= (count (filter (partial = :b) option-sequence))
- (:b @submitted-option-counts))))))
+ (letfn [(verify-workflow-options [options]
+ (is (:supports_common_options options))
+ (is (:supports_options options))
+ (is (:overwritten options)))
+ (verify-submitted-options [env _ _ inputs options _]
+ (let [defaults (util/make-options env)]
+ (verify-workflow-options options)
+ (is (= defaults (select-keys options (keys defaults))))
+ (map (fn [_] (UUID/randomUUID)) inputs)))]
+ (with-redefs-fn {#'submit-workflows verify-submitted-options}
+ (fn []
+ (->
+ (make-xx-workload-request)
+ (assoc-in [:common :options]
+ {:supports_common_options true :overwritten false})
+ (update :items
+ (partial map
+ #(assoc % :options {:supports_options true :overwritten true})))
+ workloads/execute-workload!
+ :workflows
+ (->> (map (comp verify-workflow-options :options))))))))
diff --git a/api/test/wfl/tools/workloads.clj b/api/test/wfl/tools/workloads.clj
index 4b1539cbc..c0ddf66c4 100644
--- a/api/test/wfl/tools/workloads.clj
+++ b/api/test/wfl/tools/workloads.clj
@@ -1,6 +1,8 @@
(ns wfl.tools.workloads
- (:require [clojure.tools.logging.readable :as log]
+ (:require [clojure.string :as str]
+ [clojure.tools.logging.readable :as log]
[wfl.environments :refer [stuff]]
+ [wfl.jdbc :as jdbc]
[wfl.module.aou :as aou]
[wfl.module.copyfile :as cp]
[wfl.module.wgs :as wgs]
@@ -8,31 +10,35 @@
[wfl.service.postgres :as postgres]
[wfl.service.cromwell :as cromwell]
[wfl.tools.endpoints :as endpoints]
- [wfl.util :refer [shell!]]
- [wfl.util :as util]
- [wfl.jdbc :as jdbc]
- [wfl.tools.fixtures :as fixtures])
+ [wfl.tools.fixtures :as fixtures]
+ [wfl.util :as util :refer [shell!]])
(:import (java.util.concurrent TimeoutException)))
(def git-branch (delay (util/shell! "git" "branch" "--show-current")))
(def wgs-inputs
- {:unmapped_bam_suffix ".unmapped.bam",
- :sample_name "NA12878 PLUMBING",
- :base_file_name "NA12878_PLUMBING",
- :final_gvcf_base_name "NA12878_PLUMBING",
- :input_cram "develop/20k/NA12878_PLUMBING.cram"})
+ (let [input-folder
+ (str/join "/" ["gs://broad-gotc-dev-wfl-ptc-test-inputs"
+ "single_sample/plumbing/truth/develop/20k/"])]
+ {:unmapped_bam_suffix ".unmapped.bam",
+ :sample_name "NA12878 PLUMBING",
+ :base_file_name "NA12878_PLUMBING",
+ :final_gvcf_base_name "NA12878_PLUMBING",
+ :input_cram (str input-folder "NA12878_PLUMBING.cram")}))
(defn wgs-workload-request
[identifier]
"A whole genome sequencing workload used for testing."
- (let [path "/single_sample/plumbing/truth"]
- {:cromwell (get-in stuff [:wgs-dev :cromwell :url])
- :input (str "gs://broad-gotc-dev-wfl-ptc-test-inputs" path)
- :output (str "gs://broad-gotc-dev-wfl-ptc-test-outputs/wgs-test-output/" identifier)
- :pipeline wgs/pipeline
- :project (format "(Test) %s" @git-branch)
- :items [{:inputs wgs-inputs}]}))
+ {:cromwell (get-in stuff [:wgs-dev :cromwell :url])
+ :output (str "gs://broad-gotc-dev-wfl-ptc-test-outputs/wgs-test-output/" identifier)
+ :pipeline wgs/pipeline
+ :project (format "(Test) %s" @git-branch)
+ :items [{:inputs wgs-inputs}]
+ :common {:inputs (-> {:disable_sanity_check true}
+ (util/prefix-keys :CheckContamination)
+ (util/prefix-keys :UnmappedBamToAlignedBam)
+ (util/prefix-keys :WholeGenomeGermlineSingleSample)
+ (util/prefix-keys :WholeGenomeReprocessing))}})
(defn aou-workload-request
"An allofus arrays workload used for testing.
@@ -69,16 +75,23 @@
:project (format "(Test) %s" @git-branch)
:items [{:inputs {:src src :dst dst}}]})
+(def xx-inputs
+ (let [storage "gs://broad-gotc-dev-wfl-ptc-test-inputs/single_sample/plumbing/truth/develop/20k/"]
+ {:input_cram (str storage "NA12878_PLUMBING.cram")}))
+
(defn xx-workload-request
[identifier]
"A whole genome sequencing workload used for testing."
- (let [test-storage "gs://broad-gotc-dev-wfl-ptc-test-inputs/single_sample/plumbing/truth/develop/20k/"]
- {:cromwell (get-in stuff [:xx-dev :cromwell :url])
- :output (str "gs://broad-gotc-dev-wfl-ptc-test-outputs/xx-test-output/" identifier)
- :pipeline xx/pipeline
- :project (format "(Test) %s" @git-branch)
- :common_inputs {:ExomeReprocessing.ExomeGermlineSingleSample.UnmappedBamToAlignedBam.CheckContamination.disable_sanity_check true}
- :items [{:inputs {:input_cram (str test-storage "NA12878_PLUMBING.cram")}}]}))
+ {:cromwell (get-in stuff [:xx-dev :cromwell :url])
+ :output (str "gs://broad-gotc-dev-wfl-ptc-test-outputs/xx-test-output/" identifier)
+ :pipeline xx/pipeline
+ :project (format "(Test) %s" @git-branch)
+ :items [{:inputs xx-inputs}]
+ :common {:inputs (-> {:disable_sanity_check true}
+ (util/prefix-keys :CheckContamination)
+ (util/prefix-keys :UnmappedBamToAlignedBam)
+ (util/prefix-keys :ExomeGermlineSingleSample)
+ (util/prefix-keys :ExomeReprocessing))}})
(defn when-done
"Call `done!` when cromwell has finished executing `workload`'s workflows."
@@ -102,12 +115,6 @@
(done! (endpoints/get-workload-status (:uuid workload)))
nil))
-(defn baseline-options-across-workload
- "True if OPTIONS are present across the entire WORKLOAD."
- [options workload]
- (let [unique-options (distinct (map :workflow_options (:workflows workload)))]
- (every? #(= % (util/deep-merge % options)) unique-options)))
-
(defn create-workload! [workload-request]
(jdbc/with-db-transaction [tx (fixtures/testing-db-config)]
(wfl.api.workloads/create-workload! tx workload-request)))
diff --git a/api/test/wfl/unit/modules/wgs_test.clj b/api/test/wfl/unit/modules/wgs_test.clj
deleted file mode 100644
index e63c636cb..000000000
--- a/api/test/wfl/unit/modules/wgs_test.clj
+++ /dev/null
@@ -1,9 +0,0 @@
-(ns wfl.unit.modules.wgs-test
- (:require [clojure.test :refer :all]
- [wfl.module.wgs :as wgs]))
-
-(deftest test-serialized-version
- "in version 0.4.0, wgs workloads were serialized using CromwellWorkload table"
- (is (wgs/uses-cromwell-workload-table? {:version "0.4.0"}))
- (is (wgs/uses-cromwell-workload-table? {:version "1.0.0"}))
- (is (not (wgs/uses-cromwell-workload-table? {:version "0.3.8"}))))
diff --git a/api/test/wfl/unit/modules/xx_test.clj b/api/test/wfl/unit/modules/xx_test.clj
index 244dc3189..06926e1c7 100644
--- a/api/test/wfl/unit/modules/xx_test.clj
+++ b/api/test/wfl/unit/modules/xx_test.clj
@@ -2,38 +2,42 @@
(:require [clojure.test :refer :all]
[wfl.module.xx :as xx]))
-(deftest test-make-inputs
- (testing "make-inputs from cram"
- (let [output "gs://output"
- items {:input_cram "gs://input/sample.cram"}]
- (is (xx/make-combined-inputs-to-save output {} items))))
- (testing "make-inputs from bam"
- (let [output "gs://output"
- items {:input_bam "gs://input/sample.bam"}]
- (is (xx/make-combined-inputs-to-save output {} items)))))
+(def ^:private output-url "gs://fake-output-bucket/")
-(deftest test-common-inputs
- (testing "add common overrides to the workflows"
- (let [common {:bait_set_name "frank"}
- output "gs://output"
- items {:input_cram "gs://input/sample.cram"}]
- (is (= "frank" (:bait_set_name (xx/make-combined-inputs-to-save output common items)))))))
+(deftest test-make-inputs-from-cram
+ (let [sample "gs://fake-input-bucket/folder/sample.cram"
+ inputs (xx/make-inputs-to-save output-url {:input_cram sample})]
+ (is (= sample (:input_cram inputs)))
+ (is (= "sample" (:sample_name inputs)))
+ (is (= "sample.cram" (:base_file_name inputs)))
+ (is (= "sample.cram" (:final_gvcf_base_name inputs)))
+ (is (= (str output-url "folder") (:destination_cloud_path inputs)))))
-(deftest test-override-precedence
- (testing "add common overrides to the workflows"
- (let [common {:bait_set_name "frank"}
- output "gs://output"
- items {:input_cram "gs://input/sample.cram"
- :bait_set_name "geoff"}]
- (is (= "geoff" (:bait_set_name (xx/make-combined-inputs-to-save output common items)))))))
+(deftest test-make-inputs-from-bam
+ (let [sample "gs://fake-input-bucket/folder/sample.bam"
+ inputs (xx/make-inputs-to-save output-url {:input_bam sample})]
+ (is (= sample (:input_bam inputs)))
+ (is (= "sample" (:sample_name inputs)))
+ (is (= "sample.bam" (:base_file_name inputs)))
+ (is (= "sample.bam" (:final_gvcf_base_name inputs)))
+ (is (= (str output-url "folder") (:destination_cloud_path inputs)))))
-(deftest sample-name-behaviour
- (testing "specifying sample name"
- (let [output "gs://output"
- items {:input_cram "gs://input/sample.cram"
- :sample_name "dave"}]
- (is (= "dave" (:sample_name (xx/make-combined-inputs-to-save output {} items))))))
- (testing "computing the sample name"
- (let [output "gs://output"
- items {:input_cram "gs://input/sample.foo.bar.baz.cram"}]
- (is (= "sample" (:sample_name (xx/make-combined-inputs-to-save output {} items)))))))
+(deftest test-specifying-destination_cloud_path
+ (let [destination "gs://some-bucket/in-the-middle/of-nowhere.out"
+ inputs (xx/make-inputs-to-save output-url
+ {:input_bam "gs://fake-input-bucket/sample.bam"
+ :destination_cloud_path destination})]
+ (is (= destination (:destination_cloud_path inputs)))))
+
+(deftest test-specifying-sample_name
+ (let [name "geoff"
+ inputs (xx/make-inputs-to-save output-url
+ {:input_bam "gs://fake-input-bucket/sample.bam"
+ :sample_name name})]
+ (is (= name (:sample_name inputs)))))
+
+(deftest test-specifying-arbitrary-workflow-inputs
+ (is (:arbitrary
+ (xx/make-inputs-to-save output-url
+ {:input_bam "gs://fake-input-bucket/sample.bam"
+ :arbitrary "hai"}))))
diff --git a/api/test/wfl/unit/workloads_test.clj b/api/test/wfl/unit/workloads_test.clj
new file mode 100644
index 000000000..fdf782260
--- /dev/null
+++ b/api/test/wfl/unit/workloads_test.clj
@@ -0,0 +1,15 @@
+(ns wfl.unit.workloads-test
+ (:require [clojure.test :refer :all]
+ [wfl.api.workloads :as workloads])
+ (:import (clojure.lang ExceptionInfo)))
+
+(deftest test-saved-before?
+ "in version 0.4.0, wgs workloads were serialized using CromwellWorkload table"
+ (is (not (workloads/saved-before? "0.4.0" {:version "0.4.0"})))
+ (is (not (workloads/saved-before? "0.4.0" {:version "1.0.0"})))
+ (is (not (workloads/saved-before? "0.4.0" {:version "0.4.1"})))
+ (is (workloads/saved-before? "0.4.0" {:version "0.3.8"}))
+ (is (workloads/saved-before? "0.4.0" {:version "0.0.0"}))
+ (is (thrown? ExceptionInfo (workloads/saved-before? "0.4" {:version "0.0.0"}))))
+
+
diff --git a/database/changelog.xml b/database/changelog.xml
index 2a87d5efe..2da0341ca 100644
--- a/database/changelog.xml
+++ b/database/changelog.xml
@@ -21,5 +21,4 @@
-
diff --git a/database/changesets/20201029_CromwellWorkflow.xml b/database/changesets/20201029_CromwellWorkflow.xml
index 299eb2766..f80c65c35 100644
--- a/database/changesets/20201029_CromwellWorkflow.xml
+++ b/database/changesets/20201029_CromwellWorkflow.xml
@@ -16,12 +16,14 @@
ALTER TABLE workload ALTER COLUMN input DROP NOT NULL
- CREATE Type CromwellWorkflow AS (
- id bigint, -- primary key
- status text, -- Cromwell workflow status
- updated timestamptz, -- status update time
- uuid text, -- Cromwell workflow UUID
- inputs text -- JSON string of non-default workflow inputs
+ CREATE Type CromwellWorkflow AS
+ (
+ id bigint, -- primary key
+ status text, -- Cromwell workflow status
+ updated timestamptz, -- status update time
+ uuid text, -- Cromwell workflow UUID
+ inputs text, -- JSON string of non-default workflow inputs
+ options text -- JSON string of Cromwell workflow options
)
diff --git a/database/changesets/20201031_WorkflowOptions.xml b/database/changesets/20201031_WorkflowOptions.xml
deleted file mode 100644
index 6411f4625..000000000
--- a/database/changesets/20201031_WorkflowOptions.xml
+++ /dev/null
@@ -1,26 +0,0 @@
-
-
-
-
- Add workflow options attributes to existing workflow types
-
-
- ALTER TYPE copyfile
- ADD ATTRIBUTE workflow_options text CASCADE -- workflow options to be passed to Cromwell
-
-
- ALTER TYPE CromwellWorkflow
- ADD ATTRIBUTE workflow_options text CASCADE -- workflow options to be passed to Cromwell
-
-
-