diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e7953ff3..01ee89a72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# Release 0.18.0 +- [GH-1623] workloads/update-workload! should define own transactions ([#597](https://github.com/broadinstitute/wfl/pull/597)) +- [GH-1640] Broken Terra Submission links in completed workflow Slack messages ([#598](https://github.com/broadinstitute/wfl/pull/598)) +- [GH-1639] Pin jinja2 version for mkdocks compatibility ([#596](https://github.com/broadinstitute/wfl/pull/596)) +- [GH-1433] No TDR temporary snapshots in tests ([#595](https://github.com/broadinstitute/wfl/pull/595)) +- [GH-1619] Add workload info to sink logs ([#594](https://github.com/broadinstitute/wfl/pull/594)) +- [GH-1638] Disable link unfurling when Slacking watchers ([#593](https://github.com/broadinstitute/wfl/pull/593)) + # Release 0.17.0 - [GH-1592] [GH-1635] Notify Slack watchers on Terra submission creation ([#591](https://github.com/broadinstitute/wfl/pull/591)) - [GH-1593] Notify Slack watchers on failed TDR snapshot jobs ([#590](https://github.com/broadinstitute/wfl/pull/590)) @@ -410,4 +418,4 @@ - [GH-1013] Item Nesting V2 ([#188](https://github.com/broadinstitute/wfl/pull/188)) - [GH-1034] Fix reference_fasta function ([#194](https://github.com/broadinstitute/wfl/pull/194)) - [GH-771] WGS updates ([#191](https://github.com/broadinstitute/wfl/pull/191)) -- [GH-819] External Exome Reprocessing ([#139](https://github.com/broadinstitute/wfl/pull/139)) +- [GH-819] External Exome Reprocessing ([#139](https://github.com/broadinstitute/wfl/pull/139)) \ No newline at end of file diff --git a/api/src/wfl/api/workloads.clj b/api/src/wfl/api/workloads.clj index acb13ca63..71fa46adf 100644 --- a/api/src/wfl/api/workloads.clj +++ b/api/src/wfl/api/workloads.clj @@ -27,8 +27,8 @@ (fn [_transaction workload] (:pipeline workload))) (defmulti update-workload! - "(transaction workload) -> workload" - (fn [_transaction workload] (:pipeline workload))) + "workload-record -> workload" + :pipeline) (defmulti workflows "Use db `transaction` to return the workflows managed by the `workload`." @@ -148,10 +148,10 @@ (defmethod update-workload! :default - [_ {:keys [pipeline] :as workload}] + [{:keys [pipeline] :as workload-record}] (throw (ex-info "Failed to update workload - no such pipeline" - {:workload workload + {:workload workload-record :pipeline pipeline :type ::invalid-pipeline}))) diff --git a/api/src/wfl/executor.clj b/api/src/wfl/executor.clj index 3eff7945d..3e57f8dfc 100644 --- a/api/src/wfl/executor.clj +++ b/api/src/wfl/executor.clj @@ -299,7 +299,7 @@ workflow-link (-> workflow firecloud/workflow-url (slack/link workflow)) - submission-link (-> workspace + submission-link (-> submission (firecloud/submission-url workspace) (slack/link submission))] (str/join \newline diff --git a/api/src/wfl/module/aou.clj b/api/src/wfl/module/aou.clj index f2022b0f8..8ce40b81b 100644 --- a/api/src/wfl/module/aou.clj +++ b/api/src/wfl/module/aou.clj @@ -314,13 +314,18 @@ (defmethod workloads/update-workload! pipeline - [tx {:keys [started finished] :as workload}] - (letfn [(update! [{:keys [id] :as workload}] - (batch/update-workflow-statuses! tx workload) - (when (:stopped workload) - (batch/update-workload-status! tx workload)) - (workloads/load-workload-for-id tx id))] - (if (and started (not finished)) (update! workload) workload))) + [{:keys [id started stopped finished] :as _workload-record}] + (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] + (letfn [(load-workload [] + (workloads/load-workload-for-id tx id)) + (update! [workload] + (batch/update-workflow-statuses! tx workload) + (when stopped + (batch/update-workload-status! tx workload)) + (load-workload))] + (if (and started (not finished)) + (update! (load-workload)) + (load-workload))))) (defoverload workloads/workflows pipeline aou-workflows) (defoverload workloads/workflows-by-filters pipeline aou-workflows-by-filters) diff --git a/api/src/wfl/module/batch.clj b/api/src/wfl/module/batch.clj index 7b2b20af4..d3840444f 100644 --- a/api/src/wfl/module/batch.clj +++ b/api/src/wfl/module/batch.clj @@ -184,13 +184,18 @@ (mapcat submit-batch!)))) (defn update-workload! - "Use transaction TX to batch-update WORKLOAD statuses." - [tx {:keys [started finished] :as workload}] - (letfn [(update! [{:keys [id] :as workload}] - (batch-update-workflow-statuses! tx workload) - (update-workload-status! tx workload) - (workloads/load-workload-for-id tx id))] - (if (and started (not finished)) (update! workload) workload))) + "Batch-update `workload-record` statuses." + [{:keys [id started finished] :as _workload-record}] + (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] + (letfn [(load-workload [] + (workloads/load-workload-for-id tx id)) + (update! [workload] + (batch-update-workflow-statuses! tx workload) + (update-workload-status! tx workload) + (load-workload))] + (if (and started (not finished)) + (update! (load-workload)) + (load-workload))))) (defn stop-workload! "Use transaction TX to stop the WORKLOAD." diff --git a/api/src/wfl/module/sg.clj b/api/src/wfl/module/sg.clj index a6f1570a1..d3252c76b 100644 --- a/api/src/wfl/module/sg.clj +++ b/api/src/wfl/module/sg.clj @@ -1,8 +1,8 @@ (ns wfl.module.sg "Handle Somatic Genomes." (:require [clojure.data.json :as json] - [clojure.spec.alpha :as s] [clojure.set :as set] + [clojure.spec.alpha :as s] [clojure.string :as str] [wfl.api.workloads :as workloads :refer [defoverload]] [wfl.jdbc :as jdbc] @@ -13,6 +13,7 @@ [wfl.service.clio :as clio] [wfl.service.cromwell :as cromwell] [wfl.service.google.storage :as gcs] + [wfl.service.postgres :as postgres] [wfl.util :as util] [wfl.wfl :as wfl]) (:import [java.time OffsetDateTime])) @@ -210,19 +211,22 @@ (run! (partial register-workflow-in-clio workload output) workflows)) (defn update-sg-workload! - "Use transaction `tx` to batch-update `workload` statuses." - [tx {:keys [started finished] :as workload}] - (letfn [(update! [{:keys [id] :as workload}] - (batch/batch-update-workflow-statuses! tx workload) - (batch/update-workload-status! tx workload) - (workloads/load-workload-for-id tx id))] - (if (and started (not finished)) - (let [workload' (update! workload)] - (when (:finished workload') - (register-workload-in-clio workload' - (workloads/workflows tx workload'))) - workload') - workload))) + "Batch-update `workload-record` statuses." + [{:keys [id started finished] :as _workload-record}] + (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] + (letfn [(load-workload [] + (workloads/load-workload-for-id tx id)) + (update! [workload] + (batch/batch-update-workflow-statuses! tx workload) + (batch/update-workload-status! tx workload) + (load-workload))] + (if (and started (not finished)) + (let [{:keys [finished] :as updated} (update! (load-workload))] + (when finished + (register-workload-in-clio updated + (workloads/workflows tx updated))) + updated) + (load-workload))))) (defoverload workloads/create-workload! pipeline create-sg-workload!) (defoverload workloads/start-workload! pipeline start-sg-workload!) diff --git a/api/src/wfl/module/staged.clj b/api/src/wfl/module/staged.clj index 805917ac7..d68873e4e 100644 --- a/api/src/wfl/module/staged.clj +++ b/api/src/wfl/module/staged.clj @@ -125,18 +125,25 @@ (if-not started (start workload (utc-now)) workload))) (defn ^:private update-staged-workload - "Use transaction `tx` to update `workload` statuses." - [tx {:keys [started finished] :as workload}] - (letfn [(update! [{:keys [id source executor sink] :as workload} now] + "Update `workload-record` stages." + [{:keys [id started finished] :as _workload-record}] + (letfn [(load-workload [tx] + (workloads/load-workload-for-id tx id)) + (update! [{:keys [source executor sink] :as workload} now] (-> workload (source/update-source!) (executor/update-executor!) (sink/update-sink!)) - (patch-workload tx workload {:updated now}) - (when (every? stage/done? [source executor sink]) - (patch-workload tx workload {:finished now})) - (workloads/load-workload-for-id tx id))] - (if (and started (not finished)) (update! workload (utc-now)) workload))) + (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] + (patch-workload tx workload {:updated now}) + (when (every? stage/done? [source executor sink]) + (patch-workload tx workload {:finished now})) + (load-workload tx)))] + (let [workload (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] + (load-workload tx))] + (if (and started (not finished)) + (update! workload (utc-now)) + workload)))) (defn ^:private stop-staged-workload "Use transaction `tx` to stop the `workload` looking for new data." diff --git a/api/src/wfl/server.clj b/api/src/wfl/server.clj index 9af03e52e..07ba1efd4 100644 --- a/api/src/wfl/server.clj +++ b/api/src/wfl/server.clj @@ -79,28 +79,25 @@ (wrap-json-response {:pretty true}))) (defn ^:private do-update! - "Update `_workload` in a database transaction. " - [{:keys [id] :as _workload}] - (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] - (let [workload (workloads/load-workload-for-id tx id)] - (try - (workloads/update-workload! tx workload) - (catch UserException e - (log/warning "Error updating workload" - :workload (workloads/to-log workload) - :exception e) - (slack/notify-watchers workload (.getMessage e))))))) + "Update `workload-record` and notify watchers on UserException." + [workload-record] + (try + (workloads/update-workload! workload-record) + (catch UserException e + (log/warning "UserException while updating workload" + :workload (workloads/to-log workload-record) + :exception e) + (slack/notify-watchers workload-record (.getMessage e))))) (defn ^:private try-update - "Try to update the workflows in `workload` with a backstop." - [workload] + "Try to update `workload-record` with backstop." + [workload-record] (try - (log/info "Updating workload" - :workload (workloads/to-log workload)) - (do-update! workload) + (log/info "Updating workload" :workload (workloads/to-log workload-record)) + (do-update! workload-record) (catch Throwable t (log/error "Failed to update workload" - :workload (workloads/to-log workload) + :workload (workloads/to-log workload-record) :throwable t)))) (defn ^:private update-workloads @@ -108,12 +105,13 @@ [] (try (log/info "Finding workloads to update...") - (run! try-update - (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] - (jdbc/query tx (str/join \space - ["SELECT * FROM workload" - "WHERE started IS NOT NULL" - "AND finished IS NULL"])))) + (let [query (str/join \space + ["SELECT * FROM workload" + "WHERE started IS NOT NULL" + "AND finished IS NULL"]) + records (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] + (jdbc/query tx query))] + (run! try-update records)) (catch Throwable t (log/error "Failed to update workloads" :throwable t)))) diff --git a/api/src/wfl/service/slack.clj b/api/src/wfl/service/slack.clj index 0d4ee30a4..c76d5d7ef 100644 --- a/api/src/wfl/service/slack.clj +++ b/api/src/wfl/service/slack.clj @@ -32,10 +32,13 @@ ;; https://api.slack.com/methods/chat.postMessage#errors ;; (defn ^:private post-message - "Post `message` to `channel`." + "Post `message` to `channel` with link unfurling disabled." [channel message] (let [headers {:Authorization (str "Bearer " (env/getenv "WFL_SLACK_TOKEN"))} - body (json/write-str {:channel channel :text message})] + body (json/write-str {:channel channel + :text message + :unfurl_links false + :unfurl_media false})] (-> "https://slack.com/api/chat.postMessage" (http/post {:headers headers :content-type :application/json diff --git a/api/src/wfl/sink.clj b/api/src/wfl/sink.clj index 4c771f3ae..8a1854833 100644 --- a/api/src/wfl/sink.clj +++ b/api/src/wfl/sink.clj @@ -6,7 +6,7 @@ [clojure.set :as set] [clojure.spec.alpha :as s] [clojure.string :as str] - [wfl.api.workloads :refer [defoverload]] + [wfl.api.workloads :as workloads :refer [defoverload]] [wfl.environment :as env] [wfl.jdbc :as jdbc] [wfl.log :as log] @@ -199,11 +199,8 @@ "Write outputs from consumable `executor` workflows to `entityType` table in `workspace`." [{executor :executor - {:keys [fromOutputs workspace entityType details] :as sink} :sink :as _workload}] + {:keys [fromOutputs workspace entityType details] :as sink} :sink :as workload}] (when-let [[_ {:keys [uuid] :as workflow}] (stage/peek-queue executor)] - (log/debug {:action "Attempting to sink workflow outputs" - :workflow uuid - :entityType entityType}) (let [entityName (throw-or-entity-name-from-workflow workflow sink) entity [entityType entityName] attributes (terra-workspace-sink-to-attributes workflow fromOutputs)] @@ -211,9 +208,11 @@ (firecloud/delete-entities workspace [entity])) (rawls/batch-upsert workspace [(conj entity attributes)]) (stage/pop-queue! executor) - (log/debug {:action "Sunk workflow outputs" - :workflow uuid - :entity entity}) + (log/info "Sunk workflow outputs to Terra workspace" + :workload (workloads/to-log workload) + :workflow uuid + :workspace workspace + :entityType entityType) (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] (jdbc/insert! tx details {:entity entityName :updated (util/utc-now) @@ -421,7 +420,7 @@ (defn ^:private update-datarepo-sink "Pull a workflow off the `_workload`'s `executor` queue and write its outputs as new rows in a TDR dataset table." - [{:keys [executor sink uuid labels] :as _workload}] + [{:keys [executor sink] :as workload}] (when-let [[_ workflow] (stage/peek-queue executor)] (start-ingesting-outputs sink workflow) (stage/pop-queue! executor)) @@ -430,10 +429,12 @@ (try (let [result (datarepo/job-result job)] (if (< (:bad_row_count result) 1) - (log/info "Sunk workflow outputs to dataset" - :labels labels :workload uuid) - (throw (UserException. "Row failed to sink to dataset" - {:job job :workflow workflow})))) + (log/info "Sunk all workflow outputs to TDR dataset" + :workload (workloads/to-log workload) + :job-id job + :workflow workflow) + (throw (UserException. "Row failed to sink to TDR dataset" + {:job-id job :workflow workflow})))) (finally (pop-job-queue! sink record))))) diff --git a/api/test/wfl/integration/datarepo_test.clj b/api/test/wfl/integration/datarepo_test.clj index b95ba3c92..7327e1d82 100644 --- a/api/test/wfl/integration/datarepo_test.clj +++ b/api/test/wfl/integration/datarepo_test.clj @@ -88,17 +88,18 @@ (is (== 1 (count row-ids)) "Single input row should have been written to the dataset") (testing "creating snapshot after completed ingest" - (fixtures/with-temporary-snapshot - (snapshots/unique-snapshot-request tdr-profile - dataset - table-name - row-ids) - #(let [snapshot (datarepo/snapshot %) - expected-prefix (str (:name dataset) "_" table-name)] - (is (= % (:id snapshot))) - (is (str/starts-with? (:name snapshot) expected-prefix) - (str "Snapshot name should start with " - "dataset name and table name")))))))))))) + (let [request (snapshots/unique-snapshot-request + tdr-profile + dataset + table-name + row-ids) + snapshot-id (datarepo/create-snapshot request) + snapshot (datarepo/snapshot snapshot-id) + expected-prefix (str (:name dataset) "_" table-name)] + (is (= snapshot-id (:id snapshot))) + (is (str/starts-with? (:name snapshot) expected-prefix) + (str "Snapshot name should start with " + "dataset name and table name"))))))))))) (deftest test-flattened-query-result (let [samplesheets (-> (datarepo/snapshot (:id testing-snapshot)) diff --git a/api/test/wfl/tools/fixtures.clj b/api/test/wfl/tools/fixtures.clj index 0763a84ad..baf0b84ec 100644 --- a/api/test/wfl/tools/fixtures.clj +++ b/api/test/wfl/tools/fixtures.clj @@ -193,18 +193,6 @@ datarepo/delete-snapshots-then-dataset f)) -;; Recommendation: only call this within a temporary dataset. -;; If calling within a fixed dataset, running the same test concurrently -;; may fail due to exclusive dataset locking on snapshot deletion. -;; -(defn with-temporary-snapshot - "Create a temporary Terra Data Repository Snapshot with `snapshot-request`" - [snapshot-request f] - (util/bracket - #(datarepo/create-snapshot snapshot-request) - datarepo/delete-snapshot - f)) - (defn with-temporary-workspace "Create and use a temporary Terra Workspace." ([workspace-prefix group f] diff --git a/api/test/wfl/tools/workloads.clj b/api/test/wfl/tools/workloads.clj index 1325d9072..5059d0516 100644 --- a/api/test/wfl/tools/workloads.clj +++ b/api/test/wfl/tools/workloads.clj @@ -285,7 +285,7 @@ (def start-workload! (evalT wfl.api.workloads/start-workload!)) (def stop-workload! (evalT wfl.api.workloads/stop-workload!)) (def execute-workload! (evalT wfl.api.workloads/execute-workload!)) -(def update-workload! (evalT wfl.api.workloads/update-workload!)) +(def update-workload! wfl.api.workloads/update-workload!) (def workflows (evalT wfl.api.workloads/workflows)) (def workflows-by-filters (evalT wfl.api.workloads/workflows-by-filters)) diff --git a/docs/requirements.txt b/docs/requirements.txt index 45ce9af50..7a2bbc980 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,3 +1,4 @@ pymdown-extensions==8.0.1 markdown-include==0.6.0 mkdocs-material==7.2.4 +jinja2==3.0.3 diff --git a/version b/version index c5523bd09..66333910a 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.17.0 +0.18.0