Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GH-1045] Arbitrary WGS Inputs #218

Merged
merged 14 commits into from
Nov 12, 2020
4 changes: 2 additions & 2 deletions api/src/wfl/api/handlers.clj
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
"Start the workload with UUID in REQUEST."
[request]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(let [{uuid :uuid} (get-in request [:parameters :body])]
(let [{uuid :uuid} (:body-params request)]
(logr/infof "post-start endpoint called: uuid=%s" uuid)
(let [workload (workloads/load-workload-for-uuid tx uuid)]
(->>
Expand All @@ -99,7 +99,7 @@
"Create and start workload described in BODY of REQUEST"
[request]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(let [workload-request (get-in request [:parameters :body])]
(let [workload-request (:body-params request)]
(logr/info "executing workload-request: " workload-request)
(->>
(gcs/userinfo request)
Expand Down
15 changes: 7 additions & 8 deletions api/src/wfl/api/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
(s/def ::input_bam #(str/ends-with? % ".bam"))
(s/def ::input_cram #(str/ends-with? % ".cram"))
(s/def ::output string?)
(s/def ::common_inputs map?)
(s/def ::pipeline string?)
(s/def ::project string?)
(s/def ::release string?)
Expand All @@ -31,11 +30,11 @@
(s/def ::uuid-query (s/keys :opt-un [::uuid]))
(s/def ::version string?)
(s/def ::wdl string?)
(s/def ::workflow_options map?)
(s/def ::workload-request (s/keys :opt-un [::input
::items
::common_inputs
::workflow_options]
(s/def ::options map?)
(s/def ::common map?)
(s/def ::workload-request (s/keys :opt-un [::common
::input
::items]
:req-un [::cromwell
::output
::pipeline
Expand All @@ -60,7 +59,7 @@
;; compound
(s/def ::items (s/* ::workload-inputs))
(s/def ::workload-inputs (s/keys :req-un [::inputs]
:opt-un [::workflow_options]))
:opt-un [::options]))
(s/def ::inputs (s/or :aou ::aou-workflow-inputs
:copyfile ::copyfile-workflow-inputs
:wgs ::wgs-workflow-inputs
Expand All @@ -69,7 +68,7 @@
(s/def ::workflows (s/* ::workflow))
(s/def ::workflow
(s/keys :req-un [::inputs]
:opt-un [::status ::updated ::uuid ::workflow_options]))
:opt-un [::status ::updated ::uuid ::options]))

;; aou
(s/def ::analysis_version_number integer?)
Expand Down
27 changes: 20 additions & 7 deletions api/src/wfl/api/workloads.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns wfl.api.workloads
(:require [wfl.service.postgres :as postgres]
(:require [clojure.string :as str]
[wfl.jdbc :as jdbc]
[wfl.service.postgres :as postgres]
[wfl.util :as util]))

;; always derive from base :wfl/exception
Expand Down Expand Up @@ -110,18 +111,30 @@
[tx workload]
(letfn [(unnilify [m] (into {} (filter second m)))
(split-inputs [m]
(let [keep [:id :finished :status :updated :uuid :workflow_options]]
(assoc (select-keys m keep)
:inputs (unnilify (apply dissoc m keep)))))
(unpack-options [m]
(update m :workflow_options #(when % (util/parse-json %))))]
(let [keep [:id :finished :status :updated :uuid :options]]
(assoc (select-keys m keep) :inputs (apply dissoc m keep))))
(load-options [m] (update m :options (fnil util/parse-json "null")))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should build that into parse-json?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be uses where passing nil into this is an error - building the fnil'ness into parse-json would make that harder to detect.

(try
(->> (postgres/get-table tx (:items workload))
(mapv (comp unnilify split-inputs unpack-options))
(mapv (comp unnilify split-inputs load-options))
(assoc workload :workflows)
unnilify)
(catch Throwable cause
(throw (ex-info "Error loading workload"
{:workload workload} cause))))))

(defoverload load-workload-impl :default default-load-workload-impl)

;; Common workload operations
(defn saved-before?
"Test if the `_workload` was saved before the `reference` version string.
Version strings must be in the form \"major.minor.patch\"."
[reference {:keys [version] :as _workload}]
(letfn [(decode [v] (map util/parse-int (str/split v #"\.")))
(validate [v] (when (not= 3 (count v))
(throw (ex-info "malformed version string"
{:version (str/join "." v)})))
v)
(lt? [[x & xs] [y & ys]]
(or (< x y) (and (== x y) (every? some? [xs ys]) (lt? xs ys))))]
(util/on lt? (comp validate decode) version reference)))
33 changes: 24 additions & 9 deletions api/src/wfl/module/batch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"Some utilities shared between batch workloads in cromwell."
(:require [clojure.pprint :refer [pprint]]
[wfl.jdbc :as jdbc]
[wfl.module.all :as all]
[wfl.service.postgres :as postgres]
[wfl.util :as util]
[wfl.wfl :as wfl])
(:import [java.util UUID]))

Expand All @@ -12,17 +15,29 @@
[tx {:keys [release top] :as _workflow-wdl} {:keys [pipeline] :as workload-request}]
(let [[{:keys [id]}]
(-> workload-request
(select-keys [:creator :cromwell :input :output :project])
(merge (-> (wfl/get-the-version) (select-keys [:commit :version])))
(assoc :release release
:wdl top
:uuid (UUID/randomUUID))
(->> (jdbc/insert! tx :workload)))
(select-keys [:creator :cromwell :input :output :project])
(update :cromwell all/de-slashify)
(merge (select-keys (wfl/get-the-version) [:commit :version]))
(assoc :release release :wdl top :uuid (UUID/randomUUID))
(->> (jdbc/insert! tx :workload)))
table (format "%s_%09d" pipeline id)]
(jdbc/execute! tx
["UPDATE workload SET pipeline = ?::pipeline WHERE id = ?" pipeline id])
["UPDATE workload SET pipeline = ?::pipeline WHERE id = ?" pipeline id])
(jdbc/db-do-commands tx
(map #(format "CREATE TABLE %s OF CromwellWorkflow (PRIMARY KEY (id))" %)
[table]))
(map #(format "CREATE TABLE %s OF CromwellWorkflow (PRIMARY KEY (id))" %)
[table]))
(jdbc/update! tx :workload {:items table} ["id = ?" id])
[id table]))

(defn load-batch-workload-impl
"Use transaction `tx` to load and associate the workflows in the `workload`
stored in a CromwellWorkflow table."
[tx {:keys [items] :as workload}]
(letfn [(unnilify [m] (into {} (filter second m)))
(load-inputs [m] (update m :inputs (fnil util/parse-json "null")))
(load-options [m] (update m :options (fnil util/parse-json "null")))]
(->> (postgres/get-table tx items)
(mapv (comp unnilify load-options load-inputs))
(assoc workload :workflows)
load-options
unnilify)))
74 changes: 43 additions & 31 deletions api/src/wfl/module/copyfile.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
(ns wfl.module.copyfile
"A dummy module for smoke testing wfl/cromwell auth."
(:require [clojure.data.json :as json]
[wfl.api.workloads :as workloads]
[wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
[wfl.module.all :as all]
[wfl.module.batch :as batch]
[wfl.service.cromwell :as cromwell]
[wfl.util :as util])
(:import [java.time OffsetDateTime]))
Expand All @@ -14,57 +15,68 @@
{:release "copyfile-v1.0"
:top "wdl/copyfile.wdl"})

(defn- submit-workflow
(defn ^:private get-cromwell-environment [{:keys [cromwell]}]
(let [envs (all/cromwell-environments #{:gotc-dev :gotc-prod} cromwell)]
(when (not= 1 (count envs))
(throw (ex-info "no unique environment matching Cromwell URL."
{:cromwell cromwell
:environments envs})))
(first envs)))

(defn ^:private submit-workflow
"Submit WORKFLOW to Cromwell in ENVIRONMENT with OPTIONS and LABELS."
[environment workflow options labels]
[environment inputs options labels]
(cromwell/submit-workflow
environment
(util/extract-resource (:top workflow-wdl))
nil
(-> workflow (select-keys [:src :dst]) (util/prefix-keys pipeline))
(-> inputs (util/prefix-keys pipeline))
options
labels))

(defn add-copyfile-workload!
"Use transaction TX to add the workload described by WORKLOAD-REQUEST."
[tx {:keys [items] :as _workload-request}]
(let [workflow-options (-> (:cromwell _workload-request)
all/cromwell-environments
first
util/make-options
(util/deep-merge (:workflow_options _workload-request)))
[id table] (all/add-workload-table! tx workflow-wdl _workload-request)
to-row (fn [item] (assoc (:inputs item)
:workflow_options
(json/write-str (util/deep-merge workflow-options (:workflow_options item)))))]
(letfn [(add-id [m id] (assoc m :id id))]
(jdbc/insert-multi! tx table (map add-id (map to-row items) (range))))
id))
(defn create-copyfile-workload!
"Use transaction TX to add the workload described by REQUEST."
[tx {:keys [items common] :as request}]
(letfn [(merge-to-json [shared specific]
(json/write-str (util/deep-merge shared specific)))
(serialize [workflow id]
(-> workflow
(assoc :id id)
(update :inputs #(merge-to-json (:inputs common) %))
(update :options #(merge-to-json (:options common) %))))]
(let [[id table] (batch/add-workload-table! tx workflow-wdl request)]
(jdbc/insert-multi! tx table (map serialize items (range)))
(workloads/load-workload-for-id tx id))))
Comment on lines +37 to +49
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The create-workload! implementations for copyfile, wgs and xx are almost identical. In a subsequent change, we could explore reducing this technical debt. This means (in theory) we need to test only once for the "batch" workflows (or instantiate a test suite).


(defn start-copyfile-workload!
"Use transaction TX to start _WORKLOAD."
[tx {:keys [cromwell items uuid] :as workload}]
(let [env (first (all/cromwell-environments cromwell))]
(letfn [(submit! [{:keys [id inputs workflow_options]}]
[id (submit-workflow env inputs workflow_options {:workload uuid}) "Submitted"])
(update! [tx [id uuid status]]
[tx {:keys [items uuid] :as workload}]
(let [env (get-cromwell-environment workload)
default-options (util/make-options env)]
(letfn [(submit! [{:keys [id inputs options]}]
[id (submit-workflow env inputs
(util/deep-merge default-options options)
{:workload uuid})])
(update! [tx [id uuid]]
(jdbc/update! tx items
{:updated (OffsetDateTime/now) :uuid uuid :status status}
{:updated (OffsetDateTime/now) :uuid uuid :status "Submitted"}
["id = ?" id]))]
(run! (comp (partial update! tx) submit!) (:workflows workload))
(jdbc/update! tx :workload
{:started (OffsetDateTime/now)} ["uuid = ?" uuid]))))
Comment on lines 51 to 66
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementations for start-workload! still differ somewhat. This is due to the xx module using (a faked) batch submit to cromwell. In a subsequent change, we could explore moving the other modules onto that endpoint and actually implement it. Given that, maybe we can extract common functionality for these modules (c.f. create-workload!)


(defmethod workloads/create-workload!
pipeline
[tx request]
(->>
(add-copyfile-workload! tx request)
(workloads/load-workload-for-id tx)))
(defoverload workloads/create-workload! pipeline create-copyfile-workload!)

(defmethod workloads/start-workload!
pipeline
[tx {:keys [id] :as workload}]
(do
(start-copyfile-workload! tx workload)
(workloads/load-workload-for-id tx id)))

(defmethod workloads/load-workload-impl
pipeline
[tx workload]
(if (workloads/saved-before? "0.4.0" workload)
(workloads/default-load-workload-impl tx workload)
(batch/load-batch-workload-impl tx workload)))
Loading