Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GH-1221] import snapshots into a workspace #347

Merged
merged 18 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/src/wfl/environment.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"WFL_COOKIE_SECRET"
#(-> "secret/dsde/gotc/dev/zero" vault-secrets :cookie_secret)
"WFL_TDR_URL"
#(-> "https://jade.datarepo-dev.broadinstitute.org/")
#(-> "https://jade.datarepo-dev.broadinstitute.org")
"WFL_OAUTH2_CLIENT_ID"
#(-> "secret/dsde/gotc/dev/zero" vault-secrets :oauth2_client_id)
"WFL_POSTGRES_PASSWORD"
Expand All @@ -47,7 +47,7 @@
"WFL_POSTGRES_USERNAME"
#(-> nil)
"WFL_FIRECLOUD_URL"
#(-> "https://api.firecloud.org/")
#(-> "https://api.firecloud.org")

;; -- variables used in test code below this line --
"WFL_CROMWELL_URL"
Expand Down
140 changes: 81 additions & 59 deletions api/src/wfl/service/datarepo.clj
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
(ns wfl.service.datarepo
"Do stuff in the data repo."
(:require [clj-http.client :as http]
[clojure.data.json :as json]
[clojure.string :as str]
[wfl.auth :as auth]
[wfl.environment :as env]
[wfl.mime-type :as mime-type]
[wfl.util :as util])
(:require [clj-http.client :as http]
[clojure.data.json :as json]
[clojure.string :as str]
[wfl.auth :as auth]
[wfl.environment :as env]
[wfl.mime-type :as mime-type]
[wfl.service.google.bigquery :as bigquery]
[wfl.util :as util])
(:import (java.time Instant)
(java.util.concurrent TimeUnit)))

(defn ^:private datarepo-url [& parts]
(let [url (util/slashify (env/getenv "WFL_TDR_URL"))]
(apply str url parts)))
(let [url (util/de-slashify (env/getenv "WFL_TDR_URL"))]
(str/join "/" (conj parts url))))
ehigham marked this conversation as resolved.
Show resolved Hide resolved

(def ^:private repository
"API URL for Data Repo API."
(partial datarepo-url "api/repository/v1/"))
(partial datarepo-url "api/repository/v1"))

(defn ^:private get-repository-json [& parts]
ehigham marked this conversation as resolved.
Show resolved Hide resolved
(-> (apply repository parts)
(http/get {:headers (auth/get-auth-header)})
util/response-body-json))

(defn dataset
"Query the DataRepo for the Dataset with `dataset-id`."
[dataset-id]
(-> (repository "datasets/" dataset-id)
(http/get {:headers (auth/get-service-account-header)})
util/response-body-json))
(get-repository-json "datasets" dataset-id))

(defn ^:private ingest
"Ingest THING to DATASET-ID according to BODY."
[thing dataset-id body]
(-> (repository (format "datasets/%s/%s" dataset-id thing))
(-> (repository "datasets" dataset-id thing)
(http/post {:content-type :application/json
:headers (auth/get-service-account-header)
:body (json/write-str body :escape-slash false)})
Expand All @@ -38,8 +42,7 @@
;; While TDR does assign `loadTag`s, they're not always unique - submitting
;; requests in parallel can cause bad things to happen. Use this to create a
;; unique `loadTag` instead.
(defn ^:private new-load-tag []
(str "workflow-launcher:" (Instant/now)))
(defn ^:private new-load-tag [] (str "workflow-launcher:" (Instant/now)))
ehigham marked this conversation as resolved.
Show resolved Hide resolved

(defn ingest-file
"Ingest `source` file as `target` using `dataset-id` and `profile-id`."
Expand Down Expand Up @@ -74,24 +77,23 @@
"ingest"
dataset-id
{:format "json"
:load_tag "string"
:load_tag (new-load-tag)
:max_bad_records 0
:path path
:table table}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Odd formatting, but old.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could it be improved?


(defn poll-job
"Return result for JOB-ID in ENVIRONMENT when it stops running."
ehigham marked this conversation as resolved.
Show resolved Hide resolved
[job-id]
(let [get-result #(-> (repository "jobs/" job-id "/result")
(http/get {:headers (auth/get-service-account-header)})
util/response-body-json)
running? #(-> (repository "jobs/" job-id)
(http/get {:headers (auth/get-service-account-header)})
util/response-body-json
:job_status
#{"running"})]
(while (running?) (.sleep TimeUnit/SECONDS 1))
(get-result)))
([job-id seconds]
(let [result #(get-repository-json "jobs" job-id "result")
running? #(-> (get-repository-json "jobs" job-id)
:job_status
#{"running"})]
(while (running?)
(.sleep TimeUnit/SECONDS seconds))
(result)))
([job-id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clojure q - is a multi-arity function the best / only way to provide default argument(s)? What about the case where you might wish to allow multiple default arguments?

Copy link
Member Author

@ehigham ehigham Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's certainly one way, not sure if it's the best way. You can simulate named arguments by taking key-value pairs. Here's one way to implement that. There other ways and I'm not sure this is the best:

(let [defaults {:foo 'value1
                :bar 'value2
                :baz 'value3}]
  (defn my-named-args-function
    [& options]
    (let [inputs (apply assoc defaults options)]
      (use inputs))))

;; usage
(my-named-args-function :bar 12 :baz pi)

(poll-job job-id 5)))

(defn create-dataset
"Create a dataset with EDN `dataset-request` and return the id
Expand All @@ -113,17 +115,15 @@
(defn delete-dataset
"Delete the Dataset with `dataset-id`."
[dataset-id]
(-> (repository "datasets/" dataset-id)
(-> (repository "datasets" dataset-id)
ehigham marked this conversation as resolved.
Show resolved Hide resolved
(http/delete {:headers (auth/get-service-account-header)})
util/response-body-json
:id
poll-job))

;; Note the TDR is under active development,
;; the endpoint spec is getting changed so the
;; spec in this function is not consistent with
;; the TDR Swagger page in order to make the
;; request work.
;; Note the TDR is under active development, the endpoint spec is getting
;; changed so the spec in this function is not consistent with the TDR Swagger
;; page in order to make the request work.
;; See also https://cloud.google.com/bigquery/docs/reference/standard-sql/migrating-from-legacy-sql
(defn create-snapshot
"Return snapshot-id when the snapshot defined
Expand Down Expand Up @@ -167,7 +167,7 @@
(defn delete-snapshot
"Delete the Snapshot with `snapshot-id`."
[snapshot-id]
(-> (repository "snapshots/" snapshot-id)
(-> (repository "snapshots" snapshot-id)
(http/delete {:headers (auth/get-service-account-header)})
util/response-body-json
:id
Expand All @@ -176,30 +176,7 @@
(defn snapshot
"Return the snapshot with `snapshot-id`."
[snapshot-id]
(-> (repository "snapshots/" snapshot-id)
(http/get {:headers (auth/get-service-account-header)})
util/response-body-json))

;; Note if there are no matching rows between (start, end], TDR will throw
;; a 400 exception.
;; Note TDR prefixes datasets in BigQuery with `datarepo_`.
(defn make-snapshot-query
"Make row-id query payload from `dataset` and `table`,
given a date range specified by exclusive `start` and inclusive `end`.

Parameters
----------
_dataset - Dataset information response from TDR.
table - Name of the table in the dataset schema to query from.
start - The start date object in the timeframe to query exclusively.
end - The end date object in the timeframe to query inclusively."
[{:keys [name dataProject] :as _dataset} table start end]
(let [dataset-name (str "datarepo_" name)
query (str/join \newline ["SELECT datarepo_row_id"
"FROM `%s.%s.%s`"
"WHERE datarepo_ingest_date > '%s'"
"AND datarepo_ingest_date <= '%s'"])]
(format query dataProject dataset-name table start end)))
(get-repository-json "snapshots" snapshot-id))

(defn all-columns
"Return all of the columns of `table` in `dataset` content."
Expand All @@ -224,3 +201,48 @@
:description description
:name name
:profileId defaultProfileId}))

;; hack - TDR adds the "datarepo_" prefix to the dataset name in BigQuery
;; They plan to expose this name via `GET /api/repository/v1/datasets/{id}`
;; in a future release.
ehigham marked this conversation as resolved.
Show resolved Hide resolved
(defn ^:private bigquery-name
"Get the BigQuery name of the dataset or snapshot"
[{:keys [name] :as dataset-or-snapshot}]
(letfn [(snapshot? [x] (util/absent? x :defaultSnapshotId))]
(if (snapshot? dataset-or-snapshot) name (str "datarepo_" name))))

(defn ^:private query-table-impl
([{:keys [dataProject] :as dataset} table col-spec]
(let [bq-name (bigquery-name dataset)]
(->> (format "SELECT %s FROM `%s.%s.%s`" col-spec dataProject bq-name table)
(bigquery/query-sync dataProject)))))

(defn query-table
"Query everything or optionally the `columns` in `table` in the Terra DataRepo
`dataset`, where `dataset` is a DataRepo dataset or a snapshot of a dataset."
([dataset table]
(query-table-impl dataset table "*"))
([dataset table columns]
(->> (util/to-comma-separated-list (map name columns))
(query-table-impl dataset table))))

(defn ^:private query-table-between-impl
ehigham marked this conversation as resolved.
Show resolved Hide resolved
[{:keys [dataProject] :as dataset} table [start end] col-spec]
(let [bq-name (bigquery-name dataset)
query "SELECT %s
FROM `%s.%s.%s`
WHERE datarepo_ingest_date > '%s'
AND datarepo_ingest_date <= '%s'"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query will print awkwardly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you mean?

(->> (format query col-spec dataProject bq-name table start end)
(bigquery/query-sync dataProject))))

(defn query-table-between
"Query everything or optionally the `columns` in `table` in the Terra DataRepo
`dataset` in the closed-open `interval` of `datarepo_ingest_date`, where
`dataset` is a DataRepo dataset or a snapshot of a dataset. If no rows match
the `interval`, TDR will respond with error 400."
ehigham marked this conversation as resolved.
Show resolved Hide resolved
([dataset table interval]
(query-table-between-impl dataset table interval "*"))
([dataset table interval columns]
(->> (util/to-comma-separated-list (map name columns))
(query-table-between-impl dataset table interval))))
61 changes: 44 additions & 17 deletions api/src/wfl/service/firecloud.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@
[wfl.util :as util]))

(defn ^:private firecloud-url [& parts]
(let [url (util/slashify (env/getenv "WFL_FIRECLOUD_URL"))]
(apply str url parts)))
(let [url (util/de-slashify (env/getenv "WFL_FIRECLOUD_URL"))]
(str/join "/" (conj parts url))))
ehigham marked this conversation as resolved.
Show resolved Hide resolved

(def workspace-api-url (partial firecloud-url "api/workspaces/"))
(def ^:private workspace-api-url
(partial firecloud-url "api/workspaces"))

(defn ^:private get-workspace-json [& parts]
(-> (apply workspace-api-url parts)
(http/get {:headers (auth/get-auth-header)})
util/response-body-json))
ehigham marked this conversation as resolved.
Show resolved Hide resolved

(defn abort-submission
"Abort the submission with `submission-id` in the Terra `workspace`."
[workspace submission-id]
(-> (workspace-api-url (str/join "/" [workspace "submissions" submission-id]))
(-> (workspace-api-url workspace "submissions" submission-id)
(http/delete {:headers (auth/get-auth-header)})))

(defn create-submission
Expand All @@ -38,30 +44,27 @@
util/response-body-json
:submissionId)))

(defn get-workspace
"Get a single `workspace`'s details"
[workspace]
(get-workspace-json workspace))
ehigham marked this conversation as resolved.
Show resolved Hide resolved

(defn get-submission
"Return the submission in the Terra `workspace` with `submission-id`."
[workspace submission-id]
(-> (workspace-api-url workspace "/submissions/" submission-id)
(http/get {:headers (auth/get-auth-header)})
util/response-body-json))
(get-workspace-json workspace "submissions" submission-id))

(defn get-workflow
"Query the `firecloud-url` for the the `workflow` created by the `submission`
in the Terra `workspace`."
[workspace submission-id workflow-id]
(-> (str/join "/" [workspace "submissions" submission-id "workflows" workflow-id])
workspace-api-url
(http/get {:headers (auth/get-auth-header)})
util/response-body-json))
(get-workspace-json workspace "submissions" submission-id "workflows" workflow-id))

(defn get-workflow-outputs
"Query the `firecloud-url` for the outputs of the `workflow` created by
the `submission` in the Terra `workspace`."
[workspace submission-id workflow-id]
(-> (str/join "/" [workspace "submissions" submission-id "workflows" workflow-id "outputs"])
workspace-api-url
(http/get {:headers (auth/get-auth-header)})
util/response-body-json))
(get-workspace-json workspace "submissions" submission-id "workflows" workflow-id "outputs"))

(defn get-workflow-status-by-entity
"Get workflow status given a Terra submission-id and entity-name."
Expand All @@ -73,6 +76,20 @@
first
:status)))

(defn delete-entities
"Delete the `entities` from the Terra `workspace`.
Parameters
----------
workspace - Terra Workspace to delete entities from
entities - list of entity `[type name]` pairs"
[workspace entities]
(letfn [(make-entity [[type name]] {:entityType type :entityName name})]
(-> (workspace-api-url workspace "entities" "delete")
(http/post {:headers (auth/get-auth-header)
:content-type :application/json
:body (json/write-str (map make-entity entities))})
util/response-body-json)))

(defn import-entities
"Import sample entities into a Terra WORKSPACE from a tsv FILE.
The upload requires owner permission on the workspace.
Expand All @@ -86,14 +103,24 @@
-------
(import-entities \"workspace-namespace/workspace-name\" \"./samples.tsv\")"
[workspace file]
(-> (workspace-api-url workspace "/flexibleImportEntities")
(-> (workspace-api-url workspace "flexibleImportEntities")
(http/post {:headers (auth/get-auth-header)
:multipart (util/multipart-body
{:Content/type "text/tab-separated-values"
:entities (slurp file)})})))

(defn list-entities
"List all entities with `entity-type` in `workspace`."
[workspace entity-type]
(get-workspace-json workspace "entities" entity-type))

(defn list-entity-types
"List the entity types along with their attributes in `workspace`."
[workspace]
(get-workspace-json workspace "entities"))

(defn describe-workflow
"Get a machine-readbale description of the `workflow`, including its inputs
"Get a machine-readable description of the `workflow`, including its inputs
ehigham marked this conversation as resolved.
Show resolved Hide resolved
and outputs. `workflow` can either be a url or the workflow source code."
[workflow]
(letfn [(url? [s] (some #(str/starts-with? s %) ["http://" "https://"]))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old, but pretty predicate. Would probably use partial though.
{(if (url? workflow) :workflowUrl :workflowSource) workflow}

Copy link
Member Author

@ehigham ehigham Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you use partial here? Can you give an example?

Expand Down
34 changes: 17 additions & 17 deletions api/src/wfl/service/google/bigquery.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@
util/response-body-json
:tables))

(defn ^:private normalize-table [{:keys [schema] :as response}]
(let [repeated? (mapv #(= "REPEATED" (:mode %)) (:fields schema))]
rexwangcc marked this conversation as resolved.
Show resolved Hide resolved
(letfn [(flatten-column [idx column]
(if (repeated? idx) (mapv :v (:v column)) (:v column)))
(flatten-row [row] (vec (map-indexed flatten-column (:f row))))]
(update response :rows #(map flatten-row %)))))
ehigham marked this conversation as resolved.
Show resolved Hide resolved

(defn query-sync
"Given QUERY, look for rows in a BigQuery table within a
Google Cloud PROJECT synchronously, using non-legacy query
Expand All @@ -56,15 +63,13 @@
project - Google Cloud Project to list the BigQuery datasets in.
query - BigQuery Standard SQL query string."
[project query]
(letfn [(flatten-rows [rows] (mapv #(map :v (:f %)) rows))]
(-> (str/join "/" ["projects" project "queries"])
bigquery-url
(http/post {:headers (auth/get-auth-header)
:body (json/write-str
{:query query
:use_legacy_sql false})})
util/response-body-json
(update :rows flatten-rows))))
(-> (str/join "/" ["projects" project "queries"])
bigquery-url
(http/post {:headers (auth/get-auth-header)
:body (json/write-str {:query query
:use_legacy_sql false})})
util/response-body-json
normalize-table))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought: Maybe these *-url helpers could take a seq?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would that help? What would the seq be used for?


(defn dump-table->tsv
"Dump a BigQuery TABLE/view into a tsv FILE that's supported by Terra.
Expand All @@ -88,15 +93,10 @@
([table terra-data-table file]
(letfn [(format-header-for-terra [header]
(cons (format "entity:%s_id" terra-data-table) (rest header)))]
(let [headers (map :name (get-in table [:schema :fields]))
rows (get-in table [:rows])
contents (-> []
(into [(format-header-for-terra headers)])
(into rows))]
(let [headers (map :name (get-in table [:schema :fields]))
contents (conj (:rows table) (format-header-for-terra headers))]
(with-open [writer (io/writer file)]
(csv/write-csv writer
contents
:separator \tab)
(csv/write-csv writer contents :separator \tab)
file))))
([table terra-data-table]
(str (dump-table->tsv table terra-data-table (StringWriter.)))))
Loading