Skip to content

Commit

Permalink
[GH-1402] Write Workflow Outputs to the Terra Data Repository (#474)
Browse files Browse the repository at this point in the history
* [GH-1405] Add Schema for TerraDataRepoSink (#473)

[GH-1405] Add Schema for TerraDataRepoSink
RR: https://broadinstitute.atlassian.net/browse/GH-1405
Add schemas for the following types
- TDRJobStatus
- TDRJobType
- TerraDataRepoSinkDetails
and the TerraDataRepoSink table.

* [GH-1314] Start Implementing DataRepo Sink (#476)

RR: https://broadinstitute.atlassian.net/browse/GH-1413
Add load/create functions for the data repo sink - leaving update as "unimplemented".

* [GH-1415] Implement `update-sink` for TerraDataRepoSink (#478)

RR: https://broadinstitute.atlassian.net/browse/GH-1415

This change contains a rough implementation of the TerraDataRepo sink update functionality. After discovering certain hurdles in the original design, I've made some tweaks and incorporated TDR's work https://broadworkbench.atlassian.net/browse/DR-1960 so that we only need one ingest request instead of separately loading the output files and output metadata. At the time of writing, DR-1960 is work in progress so our ingests won't work just yet.

Some things to call out in this change that I'm deferring to a follow up change into the feature branch:

I need to upload the json file for TDR to ingest. I'm currently using our test outputs bucket as a temporary folder for this which is obviously not a production-ready solution. Possible fixes include creating a scratch bucket for workflow-launcher, creating a bucket per workload, using the executor's execution bucket... etc. Suggestions welcome.
Since the ingests always fail, I'm leaving some of the work post-ingest as a TODO.

* [GH-1425] Expose `TerraDataRepoSink` Via the HTTP API (#484)

RR: https://broadinstitute.atlassian.net/browse/GH-1425
Add specs and register in wfl.api/spec.
Add end-to-end system test for reading/writing workflow inputs/outputs to TDR (used the illumina_genotyping_array pipeline as it's smaller than sarscov2_illumina_full).

Co-authored-by: Chengchen(Rex) Wang <14366016+rexwangcc@users.noreply.github.com>

* [GH-1416] Document DataRepo Sink (#485)

RR: https://broadinstitute.atlassian.net/browse/GH-1416
Add section to `sink.md` describing how to configure workflow-launcher
to write outputs back to a terra datarepo dataset.

* [GH-1430] Remove `stage/validate-or-throw` and inline their implementations into `create-X`. (#490)

RR: https://broadinstitute.atlassian.net/browse/GH-1430
Remove stage/validate-or-throw and inline their implementations in their respective create-X functions.

This is done because
- we were losing context (ie. was this a source/sink/executor) and had a collision in the tdr sink and source impls
- these multimethods didn't really make sense and require some redesign work

* address @okotsopoulos's feedback

Co-authored-by: Chengchen(Rex) Wang <14366016+rexwangcc@users.noreply.github.com>
  • Loading branch information
ehigham and rexwangcc authored Aug 13, 2021
1 parent 83677c2 commit a15e803
Show file tree
Hide file tree
Showing 37 changed files with 1,160 additions and 447 deletions.
4 changes: 2 additions & 2 deletions api/src/wfl/api/handlers.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
(ns wfl.api.handlers
"Define handlers for API endpoints. Require wfl.module namespaces here."
(:require [clojure.set :refer [rename-keys]]
[wfl.log :as log]
[wfl.configuration :as config]
[ring.util.http-response :as response]
[wfl.api.workloads :as workloads]
[wfl.configuration :as config]
[wfl.jdbc :as jdbc]
[wfl.log :as log]
[wfl.module.aou :as aou]
[wfl.module.copyfile]
[wfl.module.covid]
Expand Down
4 changes: 2 additions & 2 deletions api/src/wfl/api/routes.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(ns wfl.api.routes
"Define routes for API endpoints."
(:require [clojure.string :as str]
[wfl.log :as log]
[muuntaja.core :as muuntaja-core]
[reitit.coercion.spec]
[reitit.ring :as ring]
Expand All @@ -12,9 +11,10 @@
[reitit.ring.middleware.parameters :as parameters]
[reitit.swagger :as swagger]
[wfl.api.handlers :as handlers]
[wfl.api.spec :as spec]
[wfl.api.workloads :as workloads]
[wfl.environment :as env]
[wfl.api.spec :as spec]
[wfl.log :as log]
[wfl.module.all :as all]
[wfl.module.aou :as aou]
[wfl.util :as util]
Expand Down
2 changes: 1 addition & 1 deletion api/src/wfl/environment.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(ns wfl.environment
"Map environment to various values here."
(:require [clojure.data.json :as json]
[wfl.log :as log]
[clojure.string :as str]
[wfl.log :as log]
[vault.client.http] ; vault.core needs this
[vault.core :as vault]))

Expand Down
33 changes: 21 additions & 12 deletions api/src/wfl/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,32 @@
association in the `workload`."
(fn [_transaction workload] (:executor_type workload)))

(defmethod create-executor! :default
[_ _ {:keys [name] :as request}]
(throw (UserException.
"Invalid executor name"
{:name name
:request request
:status 400
:executors (-> create-executor! methods (dissoc :default) keys)})))

;; Terra Executor
(def ^:private terra-executor-name "Terra")
(def ^:private terra-executor-type "TerraExecutor")
(def ^:private terra-executor-table "TerraExecutor")
(def ^:private terra-executor-serialized-fields
(def ^:private ^:const terra-executor-name "Terra")
(def ^:private ^:const terra-executor-type "TerraExecutor")
(def ^:private ^:const terra-executor-table "TerraExecutor")
(def ^:private ^:const terra-executor-serialized-fields
{:workspace :workspace
:methodConfiguration :method_configuration
:methodConfigurationVersion :method_configuration_version
:fromSource :from_source})

(defn ^:private create-terra-executor [tx id request]
(defn ^:private write-terra-executor [tx id executor]
(let [create "CREATE TABLE %s OF TerraExecutorDetails (PRIMARY KEY (id))"
alter "ALTER TABLE %s ALTER COLUMN id ADD GENERATED ALWAYS AS IDENTITY"
details (format "%s_%09d" terra-executor-type id)]
(jdbc/db-do-commands tx [(format create details) (format alter details)])
[terra-executor-type
(-> (select-keys request (keys terra-executor-serialized-fields))
(-> (select-keys executor (keys terra-executor-serialized-fields))
(update :fromSource pr-str)
(set/rename-keys terra-executor-serialized-fields)
(assoc :details details)
Expand Down Expand Up @@ -105,7 +114,7 @@
(throw (UserException. "Only Dockstore methods are supported."
{:status 400 :methodRepoMethod methodRepoMethod}))))

(defn verify-terra-executor
(defn terra-executor-validate-request-or-throw
"Verify the method-configuration exists."
[{:keys [skipValidation
workspace
Expand Down Expand Up @@ -537,19 +546,19 @@
(util/select-non-nil-keys (keys terra-executor-serialized-fields))
(assoc :name terra-executor-name)))

(defoverload create-executor! terra-executor-name create-terra-executor)
(defoverload load-executor! terra-executor-type load-terra-executor)
(defmethod create-executor! terra-executor-name
[tx id request]
(write-terra-executor tx id (terra-executor-validate-request-or-throw request)))

(defoverload load-executor! terra-executor-type load-terra-executor)
(defoverload update-executor! terra-executor-type update-terra-executor)
(defoverload executor-workflows terra-executor-type terra-executor-workflows)
(defoverload executor-workflows-by-status terra-executor-type terra-executor-workflows-by-status)
(defoverload executor-retry-workflows! terra-executor-type terra-executor-retry-workflows)

(defoverload stage/validate-or-throw terra-executor-name verify-terra-executor)
(defoverload stage/done? terra-executor-type terra-executor-done?)

(defoverload stage/peek-queue terra-executor-type peek-terra-executor-queue)
(defoverload stage/pop-queue! terra-executor-type pop-terra-executor-queue)
(defoverload stage/queue-length terra-executor-type terra-executor-queue-length)
(defoverload stage/done? terra-executor-type terra-executor-done?)

(defoverload util/to-edn terra-executor-type terra-executor-to-edn)
30 changes: 16 additions & 14 deletions api/src/wfl/module/all.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
(ns wfl.module.all
"Some utilities shared across module namespaces."
(:require [clojure.spec.alpha :as s]
[clojure.string :as str]
[wfl.jdbc :as jdbc]
[wfl.service.cromwell :as cromwell]
(:require [clojure.spec.alpha :as s]
[clojure.string :as str]
[wfl.jdbc :as jdbc]
[wfl.service.cromwell :as cromwell]
[wfl.service.google.storage :as gcs]
[wfl.util :as util]
[wfl.wfl :as wfl])
[wfl.util :as util]
[wfl.wfl :as wfl])
(:import [java.util UUID]))

(defn throw-when-output-exists-already!
Expand Down Expand Up @@ -90,6 +90,14 @@
(jdbc/db-do-commands tx [work])
[id table]))

(defn has?
"Return a function that takes a map and returns the result of applying the
`predicate?` to the value at the `key` in `map`, if one exists."
[key predicate?]
(fn [map]
(when-let [value (get map key)]
(predicate? value))))

;; shared specs
(s/def ::base_file_name string?)
(s/def ::commit (s/and string? (comp (partial == 40) count)))
Expand All @@ -100,6 +108,7 @@
(s/def ::timestamp (s/or :instant inst? :datetime util/datetime-string?))
(s/def ::created ::timestamp)
(s/def ::cromwell string?)
(s/def ::dataset string?)
(s/def ::dbsnp_vcf string?)
(s/def ::dbsnp_vcf_index string?)
(s/def ::environment string?)
Expand All @@ -114,6 +123,7 @@
(s/def ::status cromwell/status?)
(s/def ::started ::timestamp)
(s/def ::stopped ::timestamp)
(s/def ::table string?)
(s/def ::updated ::timestamp)
(s/def ::uuid util/uuid-string?)
(s/def ::uuid-kv (s/keys :req-un [::uuid]))
Expand All @@ -122,15 +132,7 @@
(s/def ::options map?)
(s/def ::common map?)

(s/def ::sink (s/keys :req-un [::name
::entityType
::fromOutputs
::identifier
::workspace]))

(s/def ::entityType string?)
(s/def ::identifier string?)
(s/def ::fromOutputs map?)
(s/def ::fromSource string?)
(s/def ::labels (s/* util/label?))
(s/def ::name string?)
Expand Down
6 changes: 3 additions & 3 deletions api/src/wfl/module/aou.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
"Process Arrays for the All Of Us project."
(:require [clojure.string :as str]
[clojure.spec.alpha :as s]
[wfl.log :as log]
[wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
[wfl.log :as log]
[wfl.module.all :as all]
[wfl.module.batch :as batch]
[wfl.references :as references]
[wfl.service.cromwell :as cromwell]
[wfl.service.google.storage :as gcs]
[wfl.service.postgres :as postgres]
[wfl.util :as util]
[wfl.wfl :as wfl]
[wfl.module.all :as all])
[wfl.wfl :as wfl])
(:import [java.sql Timestamp]
[java.time OffsetDateTime]
[java.util UUID]))
Expand Down
12 changes: 6 additions & 6 deletions api/src/wfl/module/batch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
[wfl.api.workloads :as workloads]
[wfl.auth :as auth]
[wfl.jdbc :as jdbc]
[wfl.module.all :as all]
[wfl.service.cromwell :as cromwell]
[wfl.service.postgres :as postgres]
[wfl.sink :as sink]
[wfl.source :as source]
[wfl.util :as util]
[wfl.wfl :as wfl]
[wfl.module.all :as all]
[wfl.source :as source])
[wfl.wfl :as wfl])
(:import [java.time OffsetDateTime]
[java.util UUID]
[wfl.util UserException]))
Expand All @@ -31,7 +32,7 @@
:wfl.api.spec/items
::all/labels
::all/output
::all/sink
::sink/sink
::source/source
::all/watchers]
:req-un [(or ::all/cromwell ::executor)
Expand Down Expand Up @@ -221,8 +222,7 @@
(defn query-workflows-with-status
"Return the workflows in the items `table` that match `status`."
[tx table status]
(when-not (and table (postgres/table-exists? tx table))
(throw (ex-info "Table not found" {:table table})))
(postgres/throw-unless-table-exists tx table)
(let [query-str "SELECT * FROM %s WHERE status = ? ORDER BY id ASC"]
(jdbc/query tx [(format query-str table) status])))

Expand Down
27 changes: 13 additions & 14 deletions api/src/wfl/module/covid.clj
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
(ns wfl.module.covid
"Manage the Sarscov2IlluminaFull pipeline."
(:require [wfl.api.workloads :as workloads :refer [defoverload]]
[clojure.spec.alpha :as s]
[wfl.executor :as executor]
[wfl.jdbc :as jdbc]
(:require [clojure.spec.alpha :as s]
[wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.executor :as executor]
[wfl.jdbc :as jdbc]
[wfl.module.all :as all]
[wfl.service.postgres :as postgres]
[wfl.sink :as sink]
[wfl.source :as source]
[wfl.stage :as stage]
[wfl.util :as util :refer [utc-now]]
[wfl.wfl :as wfl]
[wfl.module.all :as all])
[wfl.sink :as sink]
[wfl.source :as source]
[wfl.stage :as stage]
[wfl.util :as util :refer [utc-now]]
[wfl.wfl :as wfl])
(:import [java.util UUID]
[wfl.util UserException]))

Expand All @@ -25,7 +25,7 @@
(s/def ::creator util/email-address?)
(s/def ::workload-request (s/keys :req-un [::executor
::all/project
::all/sink
::sink/sink
::source/source]
:opt-un [::all/labels
::all/watchers]))
Expand All @@ -34,7 +34,7 @@
::creator
::executor
::all/labels
::all/sink
::sink/sink
::source/source
::all/uuid
::all/version
Expand Down Expand Up @@ -95,8 +95,7 @@

(defn ^:private create-covid-workload
[tx {:keys [source executor sink] :as request}]
(let [[source executor sink] (mapv stage/validate-or-throw [source executor sink])
id (add-workload-metadata tx request)]
(let [id (add-workload-metadata tx request)]
(jdbc/execute!
tx
(concat [update-workload-query]
Expand Down
6 changes: 3 additions & 3 deletions api/src/wfl/module/sg.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
[clojure.spec.alpha :as s]
[clojure.set :as set]
[clojure.string :as str]
[wfl.log :as log]
[wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
[wfl.log :as log]
[wfl.module.all :as all]
[wfl.module.batch :as batch]
[wfl.references :as references]
[wfl.service.clio :as clio]
[wfl.service.cromwell :as cromwell]
[wfl.service.google.storage :as gcs]
[wfl.util :as util]
[wfl.wfl :as wfl]
[wfl.module.all :as all])
[wfl.wfl :as wfl])
(:import [java.time OffsetDateTime]))

(def pipeline "GDCWholeGenomeSomaticSingleSample")
Expand Down
6 changes: 3 additions & 3 deletions api/src/wfl/server.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(ns wfl.server
"An HTTP API server."
(:require [clojure.string :as str]
[wfl.log :as log]
[clj-time.coerce :as tc]
[ring.adapter.jetty :as jetty]
[ring.middleware.defaults :as defaults]
Expand All @@ -11,12 +10,13 @@
[ring.middleware.session.cookie :as cookie]
[wfl.api.routes :as routes]
[wfl.api.workloads :as workloads]
[wfl.configuration :as config]
[wfl.environment :as env]
[wfl.jdbc :as jdbc]
[wfl.log :as log]
[wfl.service.postgres :as postgres]
[wfl.util :as util]
[wfl.wfl :as wfl]
[wfl.configuration :as config])
[wfl.wfl :as wfl])
(:import (java.util.concurrent Future TimeUnit)
(wfl.util UserException)))

Expand Down
Loading

0 comments on commit a15e803

Please sign in to comment.