Skip to content

Commit

Permalink
[GH-1641] WFL Release v0.18.0 (#599)
Browse files Browse the repository at this point in the history
GH-1623 workloads/update-workload! should define own transactions (#597)
GH-1640 Broken Terra Submission links in completed workflow Slack messages (#598)
GH-1639 Pin jinja2 version for mkdocks compatibility (#596)
GH-1433 No TDR temporary snapshots in tests (#595)
GH-1619 Add workload info to sink logs (#594)
GH-1638 Disable link unfurling when Slacking watchers (#593)
  • Loading branch information
okotsopoulos authored Apr 1, 2022
1 parent 5de2209 commit 8b59914
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 105 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# Release 0.18.0
- [GH-1623] workloads/update-workload! should define own transactions ([#597](https://github.com/broadinstitute/wfl/pull/597))
- [GH-1640] Broken Terra Submission links in completed workflow Slack messages ([#598](https://github.com/broadinstitute/wfl/pull/598))
- [GH-1639] Pin jinja2 version for mkdocks compatibility ([#596](https://github.com/broadinstitute/wfl/pull/596))
- [GH-1433] No TDR temporary snapshots in tests ([#595](https://github.com/broadinstitute/wfl/pull/595))
- [GH-1619] Add workload info to sink logs ([#594](https://github.com/broadinstitute/wfl/pull/594))
- [GH-1638] Disable link unfurling when Slacking watchers ([#593](https://github.com/broadinstitute/wfl/pull/593))

# Release 0.17.0
- [GH-1592] [GH-1635] Notify Slack watchers on Terra submission creation ([#591](https://github.com/broadinstitute/wfl/pull/591))
- [GH-1593] Notify Slack watchers on failed TDR snapshot jobs ([#590](https://github.com/broadinstitute/wfl/pull/590))
Expand Down Expand Up @@ -410,4 +418,4 @@
- [GH-1013] Item Nesting V2 ([#188](https://github.com/broadinstitute/wfl/pull/188))
- [GH-1034] Fix reference_fasta function ([#194](https://github.com/broadinstitute/wfl/pull/194))
- [GH-771] WGS updates ([#191](https://github.com/broadinstitute/wfl/pull/191))
- [GH-819] External Exome Reprocessing ([#139](https://github.com/broadinstitute/wfl/pull/139))
- [GH-819] External Exome Reprocessing ([#139](https://github.com/broadinstitute/wfl/pull/139))
8 changes: 4 additions & 4 deletions api/src/wfl/api/workloads.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
(fn [_transaction workload] (:pipeline workload)))

(defmulti update-workload!
"(transaction workload) -> workload"
(fn [_transaction workload] (:pipeline workload)))
"workload-record -> workload"
:pipeline)

(defmulti workflows
"Use db `transaction` to return the workflows managed by the `workload`."
Expand Down Expand Up @@ -148,10 +148,10 @@

(defmethod update-workload!
:default
[_ {:keys [pipeline] :as workload}]
[{:keys [pipeline] :as workload-record}]
(throw
(ex-info "Failed to update workload - no such pipeline"
{:workload workload
{:workload workload-record
:pipeline pipeline
:type ::invalid-pipeline})))

Expand Down
2 changes: 1 addition & 1 deletion api/src/wfl/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@
workflow-link (-> workflow
firecloud/workflow-url
(slack/link workflow))
submission-link (-> workspace
submission-link (-> submission
(firecloud/submission-url workspace)
(slack/link submission))]
(str/join \newline
Expand Down
19 changes: 12 additions & 7 deletions api/src/wfl/module/aou.clj
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,18 @@

(defmethod workloads/update-workload!
pipeline
[tx {:keys [started finished] :as workload}]
(letfn [(update! [{:keys [id] :as workload}]
(batch/update-workflow-statuses! tx workload)
(when (:stopped workload)
(batch/update-workload-status! tx workload))
(workloads/load-workload-for-id tx id))]
(if (and started (not finished)) (update! workload) workload)))
[{:keys [id started stopped finished] :as _workload-record}]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(letfn [(load-workload []
(workloads/load-workload-for-id tx id))
(update! [workload]
(batch/update-workflow-statuses! tx workload)
(when stopped
(batch/update-workload-status! tx workload))
(load-workload))]
(if (and started (not finished))
(update! (load-workload))
(load-workload)))))

(defoverload workloads/workflows pipeline aou-workflows)
(defoverload workloads/workflows-by-filters pipeline aou-workflows-by-filters)
Expand Down
19 changes: 12 additions & 7 deletions api/src/wfl/module/batch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,18 @@
(mapcat submit-batch!))))

(defn update-workload!
"Use transaction TX to batch-update WORKLOAD statuses."
[tx {:keys [started finished] :as workload}]
(letfn [(update! [{:keys [id] :as workload}]
(batch-update-workflow-statuses! tx workload)
(update-workload-status! tx workload)
(workloads/load-workload-for-id tx id))]
(if (and started (not finished)) (update! workload) workload)))
"Batch-update `workload-record` statuses."
[{:keys [id started finished] :as _workload-record}]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(letfn [(load-workload []
(workloads/load-workload-for-id tx id))
(update! [workload]
(batch-update-workflow-statuses! tx workload)
(update-workload-status! tx workload)
(load-workload))]
(if (and started (not finished))
(update! (load-workload))
(load-workload)))))

(defn stop-workload!
"Use transaction TX to stop the WORKLOAD."
Expand Down
32 changes: 18 additions & 14 deletions api/src/wfl/module/sg.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(ns wfl.module.sg
"Handle Somatic Genomes."
(:require [clojure.data.json :as json]
[clojure.spec.alpha :as s]
[clojure.set :as set]
[clojure.spec.alpha :as s]
[clojure.string :as str]
[wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
Expand All @@ -13,6 +13,7 @@
[wfl.service.clio :as clio]
[wfl.service.cromwell :as cromwell]
[wfl.service.google.storage :as gcs]
[wfl.service.postgres :as postgres]
[wfl.util :as util]
[wfl.wfl :as wfl])
(:import [java.time OffsetDateTime]))
Expand Down Expand Up @@ -210,19 +211,22 @@
(run! (partial register-workflow-in-clio workload output) workflows))

(defn update-sg-workload!
"Use transaction `tx` to batch-update `workload` statuses."
[tx {:keys [started finished] :as workload}]
(letfn [(update! [{:keys [id] :as workload}]
(batch/batch-update-workflow-statuses! tx workload)
(batch/update-workload-status! tx workload)
(workloads/load-workload-for-id tx id))]
(if (and started (not finished))
(let [workload' (update! workload)]
(when (:finished workload')
(register-workload-in-clio workload'
(workloads/workflows tx workload')))
workload')
workload)))
"Batch-update `workload-record` statuses."
[{:keys [id started finished] :as _workload-record}]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(letfn [(load-workload []
(workloads/load-workload-for-id tx id))
(update! [workload]
(batch/batch-update-workflow-statuses! tx workload)
(batch/update-workload-status! tx workload)
(load-workload))]
(if (and started (not finished))
(let [{:keys [finished] :as updated} (update! (load-workload))]
(when finished
(register-workload-in-clio updated
(workloads/workflows tx updated)))
updated)
(load-workload)))))

(defoverload workloads/create-workload! pipeline create-sg-workload!)
(defoverload workloads/start-workload! pipeline start-sg-workload!)
Expand Down
23 changes: 15 additions & 8 deletions api/src/wfl/module/staged.clj
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,25 @@
(if-not started (start workload (utc-now)) workload)))

(defn ^:private update-staged-workload
"Use transaction `tx` to update `workload` statuses."
[tx {:keys [started finished] :as workload}]
(letfn [(update! [{:keys [id source executor sink] :as workload} now]
"Update `workload-record` stages."
[{:keys [id started finished] :as _workload-record}]
(letfn [(load-workload [tx]
(workloads/load-workload-for-id tx id))
(update! [{:keys [source executor sink] :as workload} now]
(-> workload
(source/update-source!)
(executor/update-executor!)
(sink/update-sink!))
(patch-workload tx workload {:updated now})
(when (every? stage/done? [source executor sink])
(patch-workload tx workload {:finished now}))
(workloads/load-workload-for-id tx id))]
(if (and started (not finished)) (update! workload (utc-now)) workload)))
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(patch-workload tx workload {:updated now})
(when (every? stage/done? [source executor sink])
(patch-workload tx workload {:finished now}))
(load-workload tx)))]
(let [workload (jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(load-workload tx))]
(if (and started (not finished))
(update! workload (utc-now))
workload))))

(defn ^:private stop-staged-workload
"Use transaction `tx` to stop the `workload` looking for new data."
Expand Down
44 changes: 21 additions & 23 deletions api/src/wfl/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -79,41 +79,39 @@
(wrap-json-response {:pretty true})))

(defn ^:private do-update!
"Update `_workload` in a database transaction. "
[{:keys [id] :as _workload}]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(let [workload (workloads/load-workload-for-id tx id)]
(try
(workloads/update-workload! tx workload)
(catch UserException e
(log/warning "Error updating workload"
:workload (workloads/to-log workload)
:exception e)
(slack/notify-watchers workload (.getMessage e)))))))
"Update `workload-record` and notify watchers on UserException."
[workload-record]
(try
(workloads/update-workload! workload-record)
(catch UserException e
(log/warning "UserException while updating workload"
:workload (workloads/to-log workload-record)
:exception e)
(slack/notify-watchers workload-record (.getMessage e)))))

(defn ^:private try-update
"Try to update the workflows in `workload` with a backstop."
[workload]
"Try to update `workload-record` with backstop."
[workload-record]
(try
(log/info "Updating workload"
:workload (workloads/to-log workload))
(do-update! workload)
(log/info "Updating workload" :workload (workloads/to-log workload-record))
(do-update! workload-record)
(catch Throwable t
(log/error "Failed to update workload"
:workload (workloads/to-log workload)
:workload (workloads/to-log workload-record)
:throwable t))))

(defn ^:private update-workloads
"Update the active workflows in the active workloads."
[]
(try
(log/info "Finding workloads to update...")
(run! try-update
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(jdbc/query tx (str/join \space
["SELECT * FROM workload"
"WHERE started IS NOT NULL"
"AND finished IS NULL"]))))
(let [query (str/join \space
["SELECT * FROM workload"
"WHERE started IS NOT NULL"
"AND finished IS NULL"])
records (jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(jdbc/query tx query))]
(run! try-update records))
(catch Throwable t
(log/error "Failed to update workloads" :throwable t))))

Expand Down
7 changes: 5 additions & 2 deletions api/src/wfl/service/slack.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
;; https://api.slack.com/methods/chat.postMessage#errors
;;
(defn ^:private post-message
"Post `message` to `channel`."
"Post `message` to `channel` with link unfurling disabled."
[channel message]
(let [headers {:Authorization (str "Bearer " (env/getenv "WFL_SLACK_TOKEN"))}
body (json/write-str {:channel channel :text message})]
body (json/write-str {:channel channel
:text message
:unfurl_links false
:unfurl_media false})]
(-> "https://slack.com/api/chat.postMessage"
(http/post {:headers headers
:content-type :application/json
Expand Down
27 changes: 14 additions & 13 deletions api/src/wfl/sink.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
[clojure.set :as set]
[clojure.spec.alpha :as s]
[clojure.string :as str]
[wfl.api.workloads :refer [defoverload]]
[wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.environment :as env]
[wfl.jdbc :as jdbc]
[wfl.log :as log]
Expand Down Expand Up @@ -199,21 +199,20 @@
"Write outputs from consumable `executor` workflows
to `entityType` table in `workspace`."
[{executor :executor
{:keys [fromOutputs workspace entityType details] :as sink} :sink :as _workload}]
{:keys [fromOutputs workspace entityType details] :as sink} :sink :as workload}]
(when-let [[_ {:keys [uuid] :as workflow}] (stage/peek-queue executor)]
(log/debug {:action "Attempting to sink workflow outputs"
:workflow uuid
:entityType entityType})
(let [entityName (throw-or-entity-name-from-workflow workflow sink)
entity [entityType entityName]
attributes (terra-workspace-sink-to-attributes workflow fromOutputs)]
(when (entity-exists? workspace entity)
(firecloud/delete-entities workspace [entity]))
(rawls/batch-upsert workspace [(conj entity attributes)])
(stage/pop-queue! executor)
(log/debug {:action "Sunk workflow outputs"
:workflow uuid
:entity entity})
(log/info "Sunk workflow outputs to Terra workspace"
:workload (workloads/to-log workload)
:workflow uuid
:workspace workspace
:entityType entityType)
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(jdbc/insert! tx details {:entity entityName
:updated (util/utc-now)
Expand Down Expand Up @@ -421,7 +420,7 @@
(defn ^:private update-datarepo-sink
"Pull a workflow off the `_workload`'s `executor` queue and write its
outputs as new rows in a TDR dataset table."
[{:keys [executor sink uuid labels] :as _workload}]
[{:keys [executor sink] :as workload}]
(when-let [[_ workflow] (stage/peek-queue executor)]
(start-ingesting-outputs sink workflow)
(stage/pop-queue! executor))
Expand All @@ -430,10 +429,12 @@
(try
(let [result (datarepo/job-result job)]
(if (< (:bad_row_count result) 1)
(log/info "Sunk workflow outputs to dataset"
:labels labels :workload uuid)
(throw (UserException. "Row failed to sink to dataset"
{:job job :workflow workflow}))))
(log/info "Sunk all workflow outputs to TDR dataset"
:workload (workloads/to-log workload)
:job-id job
:workflow workflow)
(throw (UserException. "Row failed to sink to TDR dataset"
{:job-id job :workflow workflow}))))
(finally
(pop-job-queue! sink record)))))

Expand Down
23 changes: 12 additions & 11 deletions api/test/wfl/integration/datarepo_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,18 @@
(is (== 1 (count row-ids))
"Single input row should have been written to the dataset")
(testing "creating snapshot after completed ingest"
(fixtures/with-temporary-snapshot
(snapshots/unique-snapshot-request tdr-profile
dataset
table-name
row-ids)
#(let [snapshot (datarepo/snapshot %)
expected-prefix (str (:name dataset) "_" table-name)]
(is (= % (:id snapshot)))
(is (str/starts-with? (:name snapshot) expected-prefix)
(str "Snapshot name should start with "
"dataset name and table name"))))))))))))
(let [request (snapshots/unique-snapshot-request
tdr-profile
dataset
table-name
row-ids)
snapshot-id (datarepo/create-snapshot request)
snapshot (datarepo/snapshot snapshot-id)
expected-prefix (str (:name dataset) "_" table-name)]
(is (= snapshot-id (:id snapshot)))
(is (str/starts-with? (:name snapshot) expected-prefix)
(str "Snapshot name should start with "
"dataset name and table name")))))))))))

(deftest test-flattened-query-result
(let [samplesheets (-> (datarepo/snapshot (:id testing-snapshot))
Expand Down
12 changes: 0 additions & 12 deletions api/test/wfl/tools/fixtures.clj
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,6 @@
datarepo/delete-snapshots-then-dataset
f))

;; Recommendation: only call this within a temporary dataset.
;; If calling within a fixed dataset, running the same test concurrently
;; may fail due to exclusive dataset locking on snapshot deletion.
;;
(defn with-temporary-snapshot
"Create a temporary Terra Data Repository Snapshot with `snapshot-request`"
[snapshot-request f]
(util/bracket
#(datarepo/create-snapshot snapshot-request)
datarepo/delete-snapshot
f))

(defn with-temporary-workspace
"Create and use a temporary Terra Workspace."
([workspace-prefix group f]
Expand Down
2 changes: 1 addition & 1 deletion api/test/wfl/tools/workloads.clj
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@
(def start-workload! (evalT wfl.api.workloads/start-workload!))
(def stop-workload! (evalT wfl.api.workloads/stop-workload!))
(def execute-workload! (evalT wfl.api.workloads/execute-workload!))
(def update-workload! (evalT wfl.api.workloads/update-workload!))
(def update-workload! wfl.api.workloads/update-workload!)
(def workflows (evalT wfl.api.workloads/workflows))
(def workflows-by-filters (evalT wfl.api.workloads/workflows-by-filters))

Expand Down
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pymdown-extensions==8.0.1
markdown-include==0.6.0
mkdocs-material==7.2.4
jinja2==3.0.3
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.17.0
0.18.0

0 comments on commit 8b59914

Please sign in to comment.