Skip to content

Commit

Permalink
[GH-1045] Arbitrary WGS Inputs (#218)
Browse files Browse the repository at this point in the history
* use "common" for workload-level inputs and options. fix copyfile+wgs loading"

* handle reference merging properly

* merge the reference_fasta prefixes properly and test

* remove nil "options"

* fix copyfile after system tests

* i hate this middleware crap

* give up with spec

* add more tests!

* mark workflows as skipped if they are to be

* macroexpand

* un-weirdify

* don't disable sanity checking by default

* enforce semver length
  • Loading branch information
ehigham authored Nov 12, 2020
1 parent 66cb087 commit ff13c3a
Show file tree
Hide file tree
Showing 17 changed files with 539 additions and 461 deletions.
4 changes: 2 additions & 2 deletions api/src/wfl/api/handlers.clj
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
"Start the workload with UUID in REQUEST."
[request]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(let [{uuid :uuid} (get-in request [:parameters :body])]
(let [{uuid :uuid} (:body-params request)]
(logr/infof "post-start endpoint called: uuid=%s" uuid)
(let [workload (workloads/load-workload-for-uuid tx uuid)]
(->>
Expand All @@ -99,7 +99,7 @@
"Create and start workload described in BODY of REQUEST"
[request]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(let [workload-request (get-in request [:parameters :body])]
(let [workload-request (:body-params request)]
(logr/info "executing workload-request: " workload-request)
(->>
(gcs/userinfo request)
Expand Down
15 changes: 7 additions & 8 deletions api/src/wfl/api/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
(s/def ::input_bam #(str/ends-with? % ".bam"))
(s/def ::input_cram #(str/ends-with? % ".cram"))
(s/def ::output string?)
(s/def ::common_inputs map?)
(s/def ::pipeline string?)
(s/def ::project string?)
(s/def ::release string?)
Expand All @@ -31,11 +30,11 @@
(s/def ::uuid-query (s/keys :opt-un [::uuid]))
(s/def ::version string?)
(s/def ::wdl string?)
(s/def ::workflow_options map?)
(s/def ::workload-request (s/keys :opt-un [::input
::items
::common_inputs
::workflow_options]
(s/def ::options map?)
(s/def ::common map?)
(s/def ::workload-request (s/keys :opt-un [::common
::input
::items]
:req-un [::cromwell
::output
::pipeline
Expand All @@ -60,7 +59,7 @@
;; compound
(s/def ::items (s/* ::workload-inputs))
(s/def ::workload-inputs (s/keys :req-un [::inputs]
:opt-un [::workflow_options]))
:opt-un [::options]))
(s/def ::inputs (s/or :aou ::aou-workflow-inputs
:copyfile ::copyfile-workflow-inputs
:wgs ::wgs-workflow-inputs
Expand All @@ -69,7 +68,7 @@
(s/def ::workflows (s/* ::workflow))
(s/def ::workflow
(s/keys :req-un [::inputs]
:opt-un [::status ::updated ::uuid ::workflow_options]))
:opt-un [::status ::updated ::uuid ::options]))

;; aou
(s/def ::analysis_version_number integer?)
Expand Down
27 changes: 20 additions & 7 deletions api/src/wfl/api/workloads.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns wfl.api.workloads
(:require [wfl.service.postgres :as postgres]
(:require [clojure.string :as str]
[wfl.jdbc :as jdbc]
[wfl.service.postgres :as postgres]
[wfl.util :as util]))

;; always derive from base :wfl/exception
Expand Down Expand Up @@ -110,18 +111,30 @@
[tx workload]
(letfn [(unnilify [m] (into {} (filter second m)))
(split-inputs [m]
(let [keep [:id :finished :status :updated :uuid :workflow_options]]
(assoc (select-keys m keep)
:inputs (unnilify (apply dissoc m keep)))))
(unpack-options [m]
(update m :workflow_options #(when % (util/parse-json %))))]
(let [keep [:id :finished :status :updated :uuid :options]]
(assoc (select-keys m keep) :inputs (apply dissoc m keep))))
(load-options [m] (update m :options (fnil util/parse-json "null")))]
(try
(->> (postgres/get-table tx (:items workload))
(mapv (comp unnilify split-inputs unpack-options))
(mapv (comp unnilify split-inputs load-options))
(assoc workload :workflows)
unnilify)
(catch Throwable cause
(throw (ex-info "Error loading workload"
{:workload workload} cause))))))

(defoverload load-workload-impl :default default-load-workload-impl)

;; Common workload operations
(defn saved-before?
"Test if the `_workload` was saved before the `reference` version string.
Version strings must be in the form \"major.minor.patch\"."
[reference {:keys [version] :as _workload}]
(letfn [(decode [v] (map util/parse-int (str/split v #"\.")))
(validate [v] (when (not= 3 (count v))
(throw (ex-info "malformed version string"
{:version (str/join "." v)})))
v)
(lt? [[x & xs] [y & ys]]
(or (< x y) (and (== x y) (every? some? [xs ys]) (lt? xs ys))))]
(util/on lt? (comp validate decode) version reference)))
33 changes: 24 additions & 9 deletions api/src/wfl/module/batch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"Some utilities shared between batch workloads in cromwell."
(:require [clojure.pprint :refer [pprint]]
[wfl.jdbc :as jdbc]
[wfl.module.all :as all]
[wfl.service.postgres :as postgres]
[wfl.util :as util]
[wfl.wfl :as wfl])
(:import [java.util UUID]))

Expand All @@ -12,17 +15,29 @@
[tx {:keys [release top] :as _workflow-wdl} {:keys [pipeline] :as workload-request}]
(let [[{:keys [id]}]
(-> workload-request
(select-keys [:creator :cromwell :input :output :project])
(merge (-> (wfl/get-the-version) (select-keys [:commit :version])))
(assoc :release release
:wdl top
:uuid (UUID/randomUUID))
(->> (jdbc/insert! tx :workload)))
(select-keys [:creator :cromwell :input :output :project])
(update :cromwell all/de-slashify)
(merge (select-keys (wfl/get-the-version) [:commit :version]))
(assoc :release release :wdl top :uuid (UUID/randomUUID))
(->> (jdbc/insert! tx :workload)))
table (format "%s_%09d" pipeline id)]
(jdbc/execute! tx
["UPDATE workload SET pipeline = ?::pipeline WHERE id = ?" pipeline id])
["UPDATE workload SET pipeline = ?::pipeline WHERE id = ?" pipeline id])
(jdbc/db-do-commands tx
(map #(format "CREATE TABLE %s OF CromwellWorkflow (PRIMARY KEY (id))" %)
[table]))
(map #(format "CREATE TABLE %s OF CromwellWorkflow (PRIMARY KEY (id))" %)
[table]))
(jdbc/update! tx :workload {:items table} ["id = ?" id])
[id table]))

(defn load-batch-workload-impl
"Use transaction `tx` to load and associate the workflows in the `workload`
stored in a CromwellWorkflow table."
[tx {:keys [items] :as workload}]
(letfn [(unnilify [m] (into {} (filter second m)))
(load-inputs [m] (update m :inputs (fnil util/parse-json "null")))
(load-options [m] (update m :options (fnil util/parse-json "null")))]
(->> (postgres/get-table tx items)
(mapv (comp unnilify load-options load-inputs))
(assoc workload :workflows)
load-options
unnilify)))
74 changes: 43 additions & 31 deletions api/src/wfl/module/copyfile.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
(ns wfl.module.copyfile
"A dummy module for smoke testing wfl/cromwell auth."
(:require [clojure.data.json :as json]
[wfl.api.workloads :as workloads]
[wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
[wfl.module.all :as all]
[wfl.module.batch :as batch]
[wfl.service.cromwell :as cromwell]
[wfl.util :as util])
(:import [java.time OffsetDateTime]))
Expand All @@ -14,57 +15,68 @@
{:release "copyfile-v1.0"
:top "wdl/copyfile.wdl"})

(defn- submit-workflow
(defn ^:private get-cromwell-environment [{:keys [cromwell]}]
(let [envs (all/cromwell-environments #{:gotc-dev :gotc-prod} cromwell)]
(when (not= 1 (count envs))
(throw (ex-info "no unique environment matching Cromwell URL."
{:cromwell cromwell
:environments envs})))
(first envs)))

(defn ^:private submit-workflow
"Submit WORKFLOW to Cromwell in ENVIRONMENT with OPTIONS and LABELS."
[environment workflow options labels]
[environment inputs options labels]
(cromwell/submit-workflow
environment
(util/extract-resource (:top workflow-wdl))
nil
(-> workflow (select-keys [:src :dst]) (util/prefix-keys pipeline))
(-> inputs (util/prefix-keys pipeline))
options
labels))

(defn add-copyfile-workload!
"Use transaction TX to add the workload described by WORKLOAD-REQUEST."
[tx {:keys [items] :as _workload-request}]
(let [workflow-options (-> (:cromwell _workload-request)
all/cromwell-environments
first
util/make-options
(util/deep-merge (:workflow_options _workload-request)))
[id table] (all/add-workload-table! tx workflow-wdl _workload-request)
to-row (fn [item] (assoc (:inputs item)
:workflow_options
(json/write-str (util/deep-merge workflow-options (:workflow_options item)))))]
(letfn [(add-id [m id] (assoc m :id id))]
(jdbc/insert-multi! tx table (map add-id (map to-row items) (range))))
id))
(defn create-copyfile-workload!
"Use transaction TX to add the workload described by REQUEST."
[tx {:keys [items common] :as request}]
(letfn [(merge-to-json [shared specific]
(json/write-str (util/deep-merge shared specific)))
(serialize [workflow id]
(-> workflow
(assoc :id id)
(update :inputs #(merge-to-json (:inputs common) %))
(update :options #(merge-to-json (:options common) %))))]
(let [[id table] (batch/add-workload-table! tx workflow-wdl request)]
(jdbc/insert-multi! tx table (map serialize items (range)))
(workloads/load-workload-for-id tx id))))

(defn start-copyfile-workload!
"Use transaction TX to start _WORKLOAD."
[tx {:keys [cromwell items uuid] :as workload}]
(let [env (first (all/cromwell-environments cromwell))]
(letfn [(submit! [{:keys [id inputs workflow_options]}]
[id (submit-workflow env inputs workflow_options {:workload uuid}) "Submitted"])
(update! [tx [id uuid status]]
[tx {:keys [items uuid] :as workload}]
(let [env (get-cromwell-environment workload)
default-options (util/make-options env)]
(letfn [(submit! [{:keys [id inputs options]}]
[id (submit-workflow env inputs
(util/deep-merge default-options options)
{:workload uuid})])
(update! [tx [id uuid]]
(jdbc/update! tx items
{:updated (OffsetDateTime/now) :uuid uuid :status status}
{:updated (OffsetDateTime/now) :uuid uuid :status "Submitted"}
["id = ?" id]))]
(run! (comp (partial update! tx) submit!) (:workflows workload))
(jdbc/update! tx :workload
{:started (OffsetDateTime/now)} ["uuid = ?" uuid]))))

(defmethod workloads/create-workload!
pipeline
[tx request]
(->>
(add-copyfile-workload! tx request)
(workloads/load-workload-for-id tx)))
(defoverload workloads/create-workload! pipeline create-copyfile-workload!)

(defmethod workloads/start-workload!
pipeline
[tx {:keys [id] :as workload}]
(do
(start-copyfile-workload! tx workload)
(workloads/load-workload-for-id tx id)))

(defmethod workloads/load-workload-impl
pipeline
[tx workload]
(if (workloads/saved-before? "0.4.0" workload)
(workloads/default-load-workload-impl tx workload)
(batch/load-batch-workload-impl tx workload)))
Loading

0 comments on commit ff13c3a

Please sign in to comment.