Skip to content

Commit

Permalink
[GH-1586] Release 0.12.1 to AoU prod to pick up the version Arrays_v2…
Browse files Browse the repository at this point in the history
….6.3 WDL. (#558)

GH-1586 Run AoU pipeline with the Arrays_v2.6.3 WDL. (#557)
GH-1527 Remove Firecloud calls when fetching TerraExecutor workflows (#543)
  • Loading branch information
tbl3rd authored Jan 19, 2022
1 parent 3de05d5 commit 031a762
Show file tree
Hide file tree
Showing 19 changed files with 202 additions and 194 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Release 0.12.1
- [GH-1586] Run AoU pipeline with the Arrays_v2.6.3 WDL. ([#557](https://github.com/broadinstitute/wfl/pull/557))
- [GH-1527] Remove Firecloud calls when fetching TerraExecutor workflows ([#543](https://github.com/broadinstitute/wfl/pull/543))

# Release 0.12.0
- [GH-1581] Run AoU pipeline with the Arrays_v2.6.2 WDL. ([#554](https://github.com/broadinstitute/wfl/pull/554))
- [GH-1540] Apply User Comments to Terra Submissions ([#551](https://github.com/broadinstitute/wfl/pull/551))
Expand Down
4 changes: 2 additions & 2 deletions api/src/wfl/api/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
:sg ::sg/workflow-inputs
:other map?))

(s/def ::workflow (s/keys :req-un [::inputs]
:opt-un [::all/status ::all/updated ::all/uuid ::all/options]))
(s/def ::workflow (s/or :batch ::batch/workflow
:staged ::executor/executor-workflow))

(s/def ::workflows (s/* ::workflow))

Expand Down
165 changes: 76 additions & 89 deletions api/src/wfl/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,8 @@
(fn [{:keys [executor] :as _workload}] (:type executor)))

(defmulti executor-workflows
"Return the workflows managed by the `executor"
(fn [_transaction executor] (:type executor)))

(defmulti executor-workflows-by-filters
"Use db `transaction` to return the workflows created by the `executor`
matching `filters` (ex. status, submission)."
matching optional `filters` (ex. status, submission)."
(fn [_transaction executor _filters] (:type executor)))

(defmulti executor-throw-if-invalid-retry-filters
Expand Down Expand Up @@ -89,12 +85,27 @@
:fromSource :from_source})

;; Specs
(s/def ::entity ::all/uuid)
(s/def ::fromSource string?)
(s/def ::methodConfiguration (s/and string? util/terra-namespaced-name?))
(s/def ::methodConfigurationVersion integer?)
(s/def ::reference ::all/uuid)
(s/def ::submission ::all/uuid)
(s/def ::workflow ::all/uuid)

(s/def ::terra-executor (s/keys :req-un [::all/name
::all/fromSource
::all/methodConfiguration
::all/methodConfigurationVersion
::fromSource
::methodConfiguration
::methodConfigurationVersion
::all/workspace]))
(s/def ::submission util/uuid-string?)

(s/def ::terra-executor-workflow (s/keys :req-un [::entity
::methodConfiguration
::reference
::submission
::all/workspace]
:opt-un [::workflow
::all/status]))

(defn ^:private write-terra-executor [tx id executor]
(let [create "CREATE TABLE %s OF TerraExecutorDetails (PRIMARY KEY (id))"
Expand Down Expand Up @@ -261,11 +272,12 @@
"Assign workflow uuids from previous submissions"
[{:keys [workspace details] :as executor}]
(letfn [(read-a-submission-without-workflows []
(let [query "SELECT id, submission, entity FROM %s
WHERE submission IN (
SELECT submission FROM %s WHERE workflow IS NULL
LIMIT 1
)"]
(let [query (str/join \space ["SELECT id, submission, entity FROM %s"
"WHERE submission IN ("
"SELECT submission FROM %s"
"WHERE workflow IS NULL"
"LIMIT 1"
")"])]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(let [records (jdbc/query tx (format query details details))]
(when-first [{:keys [submission]} records]
Expand Down Expand Up @@ -315,7 +327,7 @@
[{:keys [workspace details] :as executor}]
(letfn [(update-status-from-firecloud
[{:keys [submission workflow] :as record}]
(->> (firecloud/get-workflow workspace submission workflow)
(->> (firecloud/get-workflow workspace submission workflow "status")
:status
(assoc record :status)))
(write-workflow-statuses [now records]
Expand Down Expand Up @@ -365,11 +377,11 @@
(defn ^:private peek-terra-executor-details
"Get first unconsumed successful workflow record from `details` table."
[{:keys [details] :as _executor}]
(let [query "SELECT * FROM %s
WHERE consumed IS NULL
AND status = 'Succeeded'
ORDER BY id ASC
LIMIT 1"]
(let [query (str/join \space ["SELECT * FROM %s"
"WHERE consumed IS NULL"
"AND status = 'Succeeded'"
"ORDER BY id ASC"
"LIMIT 1"])]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(->> (format query details)
(jdbc/query tx)
Expand Down Expand Up @@ -421,28 +433,18 @@
"Return the number of workflows in the `executor` queue that are yet to be
consumed."
[{:keys [details] :as _executor}]
(let [query "SELECT COUNT(*) FROM %s
WHERE consumed IS NULL
AND (status IS NULL OR
status NOT IN ('Failed', 'Aborted'))"]
(let [query (str/join \space ["SELECT COUNT(*) FROM %s"
"WHERE consumed IS NULL"
"AND (status IS NULL OR"
"status NOT IN ('Failed', 'Aborted'))"])]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(->> (format query details)
(jdbc/query tx)
first
:count))))

(defn ^:private terra-workflows-from-records
[{:keys [workspace] :as _executor} records]
(letfn [(from-record [{:keys [workflow submission status] :as record}]
(combine-record-workflow-and-outputs
record
(firecloud/get-workflow workspace submission workflow)
(when (= "Succeeded" status)
(firecloud/get-workflow-outputs workspace submission workflow))))]
(map from-record records)))

;; terra-executor-workflows and terra-executor-workflows-by-filters do not return
;; workflows that are being or have been retried. Why?
;; terra-executor-workflows does not return workflows
;; that are being or have been retried. Why?
;;
;; TL/DR: It's simpler maybe?
;;
Expand Down Expand Up @@ -482,74 +484,61 @@
;; not already an active workflow at the `HEAD`.
;;
;; ** Admittedly, this is still a problem with this design.

(defn ^:private terra-executor-workflows
"Return all the non-retried workflows executed by the `executor`."
[tx {:keys [details] :as executor}]
(postgres/throw-unless-table-exists tx details)
(let [query "SELECT * FROM %s
WHERE workflow IS NOT NULL
AND retry IS NULL
ORDER BY id ASC"]
(terra-workflows-from-records
executor
(jdbc/query tx (format query details)))))

(defn ^:private remove-nil-and-join
"Remove nil elements of `vec` and join with `separator`."
([vec separator]
(->> vec (remove nil?) (str/join separator)))
([vec]
(remove-nil-and-join vec \space)))

(defn ^:private terra-executor-workflows-by-filters-sql-params
(defn ^:private terra-executor-workflows-sql-params
"Return sql and params that query `details` for non-retried workflows
matching `submission` and/or `status` if specified."
[{:keys [details] :as _executor} {:keys [submission status] :as _filters}]
(let [optionals (remove-nil-and-join [(when submission "AND submission = ?")
(when status "AND status = ?")])
query (remove-nil-and-join ["SELECT * FROM %s"
"WHERE workflow IS NOT NULL"
"AND retry IS NULL"
optionals
"ORDER BY id ASC"])]
(let [query (remove-nil-and-join ["SELECT * FROM %s"
"WHERE workflow IS NOT NULL"
"AND retry IS NULL"
(when submission "AND submission = ?")
(when status "AND status = ?")
"ORDER BY id ASC"])]
(->> [submission status]
(concat [(format query details)])
(remove nil?))))

(defn ^:private terra-executor-workflows-by-filters
(defn ^:private terra-executor-workflows
"Return all the non-retried workflows executed by `executor`
matching specified `filters`."
[tx {:keys [details] :as executor} filters]
(postgres/throw-unless-table-exists tx details)
(->> (terra-executor-workflows-by-filters-sql-params executor filters)
(jdbc/query tx)
(terra-workflows-from-records executor)))

(let [sql-params (terra-executor-workflows-sql-params executor filters)
executor-subset (select-keys executor [:workspace :methodConfiguration])]
(map #(merge executor-subset %) (jdbc/query tx sql-params))))

;; 1. Presently Rawls doesn't support submitting a subset of a snapshot:
;; If we wish to retry 1+ workflows stemming from a snapshot,
;; we must resubmit the snapshot in its entirety.
;; 2. Though we are technically resubmitting snapshot references
;; and not submissions, this query takes a submission-based approach.
;; Why? Because there is no ambiguity when linking a submission
;; to its workflows. With the capacity for retries, one reference
;; may map to multiple sets of workflows, and retrying the reference
;; should only update the sibling workflow records of those supplied.
;;
(defn ^:private workflow-and-sibling-records
"Return the workflow records for all workflows in submissions
associated with the specified `workflows` --
i.e. records for `workflows` and their siblings."
[tx {:keys [details] :as executor} workflows]
(postgres/throw-unless-table-exists tx details)
(let [workflow-ids (util/to-quoted-comma-separated-list (map :uuid workflows))
_ (-> (str "%s For workflows %s, "
"fetching sibling workflow records by submission")
(format (log-prefix executor) workflow-ids)
log/debug)
;; 1. Presently Rawls doesn't support submitting a subset of a snapshot:
;; If we wish to retry 1+ workflows stemming from a snapshot,
;; we must resubmit the snapshot in its entirety.
;; 2. Though we are technically resubmitting snapshot references
;; and not submissions, this query takes a submission-based approach.
;; Why? Because there is no ambiguity when linking a submission
;; to its workflows. With the capacity for retries, one reference
;; may map to multiple sets of workflows, and retrying the reference
;; should only update the sibling workflow records of those supplied.
query "SELECT * FROM %s
WHERE submission IN
(SELECT DISTINCT submission FROM %s
WHERE workflow IN %s)"]
(let [workflow-ids (util/to-quoted-comma-separated-list
(map :workflow workflows))
query (str/join \space ["SELECT * FROM %s"
"WHERE submission IN"
"(SELECT DISTINCT submission FROM %s"
"WHERE workflow IN %s)"])]
(log/debug {:action "Fetching workflow-ids' sibling workflow records"
:executor executor
:workflow-ids workflow-ids})
(jdbc/query tx (format query details details workflow-ids))))

(defn update-retried-workflow-records
Expand Down Expand Up @@ -632,14 +621,15 @@
:status 400}
(into {} (map :data errors))))))))

;; Further work required to deal in generic entities
;; rather than assumed snapshot references:
;; https://broadinstitute.atlassian.net/browse/GH-1422
;;
(defn ^:private terra-executor-retry-workflows
"Resubmit the snapshot references associated with `workflows` in `workspace`
and update each original workflow record with the row ID of its retry."
[{{:keys [workspace] :as executor} :executor :as workload} workflows]
(letfn [(submit-reference [reference-id]
;; Further work required to deal in generic entities
;; rather than assumed snapshot references:
;; https://broadinstitute.atlassian.net/browse/GH-1422
(let [reference (rawls/get-snapshot-reference workspace reference-id)
userComment (create-user-comment "Retry" workload (get-in reference [:attributes :snapshot]))]
(->> reference
Expand All @@ -666,7 +656,6 @@
(defoverload load-executor! terra-executor-type load-terra-executor)
(defoverload update-executor! terra-executor-type update-terra-executor)
(defoverload executor-workflows terra-executor-type terra-executor-workflows)
(defoverload executor-workflows-by-filters terra-executor-type terra-executor-workflows-by-filters)
(defoverload executor-throw-if-invalid-retry-filters
terra-executor-type terra-executor-throw-if-invalid-retry-filters)
(defoverload executor-retry-workflows! terra-executor-type terra-executor-retry-workflows)
Expand All @@ -678,8 +667,6 @@

(defoverload util/to-edn terra-executor-type terra-executor-to-edn)

;; reitit http coercion specs for an executor
;; Recall s/or doesn't work (https://github.com/metosin/reitit/issues/494)
(s/def ::executor
#(condp = (:name %)
terra-executor-name (s/valid? ::terra-executor %)))
;; Generic executor-level specs following all implementations
(s/def ::executor ::terra-executor)
(s/def ::executor-workflow ::terra-executor-workflow)
16 changes: 12 additions & 4 deletions api/src/wfl/jdbc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@
(let [{:keys [line]} (meta &form)]
`(let [db# ~db
sql-params# ~sql-params]
(logger/debug (str/join " " ["jdbc/query:" (format-db db#) sql-params#]) ~line)
(logger/debug
(str/join " " ["jdbc/query:" (format-db db#) (pr-str sql-params#)])
~line)
(jdbc/query db# sql-params#))))
([db sql-params opts]
(let [{:keys [line]} (meta &form)]
`(let [db# ~db
sql-params# ~sql-params
opts# ~opts]
(logger/debug (str/join " " ["jdbc/query:" (format-db db#) sql-params# opts#]) ~line)
(logger/debug
(str/join " " ["jdbc/query:" (format-db db#) (pr-str sql-params#) opts#])
~line)
(jdbc/query db# sql-params# opts#)))))

(defmacro update!
Expand Down Expand Up @@ -88,14 +92,18 @@
(let [{:keys [line]} (meta &form)]
`(let [db# ~db
sql-params# ~sql-params]
(logger/info (str/join " " ["jdbc/execute!" (format-db db#) sql-params#]) ~line)
(logger/info
(str/join " " ["jdbc/execute!" (format-db db#) (pr-str sql-params#)])
~line)
(jdbc/execute! db# sql-params#))))
([db sql-params opts]
(let [{:keys [line]} (meta &form)]
`(let [db# ~db
sql-params# ~sql-params
opts# ~opts]
(logger/info (str/join " " ["jdbc/execute!" (format-db db#) sql-params# opts#]) ~line)
(logger/info
(str/join " " ["jdbc/execute!" (format-db db#) (pr-str sql-params#) opts#])
~line)
(jdbc/execute! db# sql-params# opts#)))))

(defmacro db-do-commands
Expand Down
3 changes: 0 additions & 3 deletions api/src/wfl/module/all.clj
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,8 @@
(s/def ::common map?)

(s/def ::entityType string?)
(s/def ::fromSource string?)
(s/def ::labels (s/* util/label?))
(s/def ::name string?)
(s/def ::methodConfiguration (s/and string? util/terra-namespaced-name?))
(s/def ::methodConfigurationVersion integer?)
(s/def ::watcher
(s/or :email slack/email-watcher?
:slack slack/slack-channel-watcher?))
Expand Down
2 changes: 1 addition & 1 deletion api/src/wfl/module/aou.clj
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

(def workflow-wdl
"The top-level WDL file and its version."
{:release "Arrays_v2.6.2"
{:release "Arrays_v2.6.3"
:path "pipelines/broad/arrays/single_sample/Arrays.wdl"})

(def cromwell-label-map
Expand Down
6 changes: 6 additions & 0 deletions api/src/wfl/module/batch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
::all/uuid
::all/version]))

(s/def ::workflow (s/keys :opt-un [::all/options
::all/status
::all/updated
::all/uuid]
:req-un [:wfl.api.spec/inputs]))

(defn ^:private cromwell-status
"`status` of the workflow with `uuid` in `cromwell`."
[cromwell uuid]
Expand Down
4 changes: 2 additions & 2 deletions api/src/wfl/module/covid.clj
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@
(defoverload workloads/load-workload-impl pipeline load-covid-workload-impl)
(defmethod workloads/workflows pipeline
[tx {:keys [executor] :as _workload}]
(executor/executor-workflows tx executor))
(executor/executor-workflows tx executor {}))
(defmethod workloads/workflows-by-filters pipeline
[tx {:keys [executor] :as _workload} filters]
(executor/executor-workflows-by-filters tx executor filters))
(executor/executor-workflows tx executor filters))
(defoverload workloads/throw-if-invalid-retry-filters
pipeline executor/executor-throw-if-invalid-retry-filters)
(defoverload workloads/retry pipeline retry-covid-workload)
Expand Down
Loading

0 comments on commit 031a762

Please sign in to comment.