Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GH-1172] Bulk Ingest Workflow Outputs #331

Merged
merged 14 commits into from
Feb 25, 2021
28 changes: 20 additions & 8 deletions api/src/wfl/mime_type.clj
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
(ns wfl.mime-type
(:require [clojure.string :as str]
[ring.util.mime-type :as mime-type]))
[ring.util.mime-type :as mime-type]
[wfl.util :as util]))

(def ^:private mime-types
"mime-types for file extensions commonly used in computational biology."
(let [bio-mimes {"bam" "application/octet-stream"
"cram" "application/octet-stream"
"fasta" "application/octet-stream"
"vcf" "text/plain"}]
(merge mime-type/default-mime-types bio-mimes)))
(let [extensions
{"bam" "application/octet-stream"
"cram" "application/octet-stream"
"fasta" "application/octet-stream"
"genbank" "application/octet-stream"
"ready" "text/plain"
"tsv" "text/tab-separated-values"
"vcf" "text/plain"}]
(merge mime-type/default-mime-types extensions)))

;; visible for testing
(defn ext-mime-type-no-default
"Look up the mime-type of the filename by file extension."
[filename]
(loop [filename (util/basename filename)]
(if-let [ext (util/extension filename)]
(or (mime-types ext) (recur (util/remove-extension filename))))))
rexwangcc marked this conversation as resolved.
Show resolved Hide resolved

(defn ext-mime-type
"Look up the mime-type of the filename by file extension."
[filename]
(if-let [ext (last (str/split filename #"\."))]
(mime-types ext)))
(or (ext-mime-type-no-default filename) "application/octet-stream"))
45 changes: 26 additions & 19 deletions api/src/wfl/service/datarepo.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
[wfl.environment :as env]
[wfl.mime-type :as mime-type]
[wfl.util :as util])
(:import (java.util.concurrent TimeUnit)))
(:import (java.time Instant)
(java.util.concurrent TimeUnit)))

(defn ^:private datarepo-url [& parts]
(let [url (util/slashify (env/getenv "WFL_TERRA_DATA_REPO_URL"))]
Expand All @@ -33,15 +34,38 @@
util/response-body-json
:id))

;; While TDR does assign `loadTag`s, they're not always unique - submitting
;; requests in parallel can cause bad things to happen. Use this to create a
;; unique `loadTag` instead.
rexwangcc marked this conversation as resolved.
Show resolved Hide resolved
(defn ^:private new-load-tag []
(str "workflow-launcher:" (Instant/now)))

(defn ingest-file
"Ingest `source` file as `target` using `dataset-id` and `profile-id`."
[dataset-id profile-id source target]
(ingest "files" dataset-id {:description (util/basename source)
:profileId profile-id
:loadTag (new-load-tag)
:mime_type (mime-type/ext-mime-type source)
:profileId profile-id
:source_path source
:target_path target}))

(defn bulk-ingest
"Ingest `source` file as `target` using `dataset-id` and `profile-id`."
[dataset-id profile-id source->target]
(letfn [(make-file-load [source target]
{:description (util/basename source)
:mimeType (mime-type/ext-mime-type source)
:sourcePath source
:targetPath target})]
(ingest
"files/bulk/array"
dataset-id
{:profileId profile-id
:loadArray (map #(apply make-file-load %) source->target)
:loadTag (new-load-tag)
:maxFailedFileLoads 0})))

(defn ingest-table
"Ingest TABLE at PATH to DATASET-ID and return the job ID."
[dataset-id path table]
Expand Down Expand Up @@ -94,20 +118,3 @@
util/response-body-json
:id
poll-job))

(comment
(def successful-file-ingest-response
{:description "something derived from file name + extension?"
:path "/zero-test/NA12878_PLUMBING.g.vcf.gz"
:directoryDetail nil
:collectionId "f359303e-15d7-4cd8-a4c7-c50499c90252"
:fileDetail {:datasetId "f359303e-15d7-4cd8-a4c7-c50499c90252"
:mimeType "text/plain"
:accessUrl "gs://broad-jade-dev-data-bucket/f359303e-15d7-4cd8-a4c7-c50499c90252/271cd32c-2e86-4f46-9eb1-f3ddb44a6c1f"}
:fileType "file"
:created "2019-11-26T15:41:06.508Z"
:checksums [{:checksum "591d9cec" :type "crc32c"}
{:checksum "24f38b33c6eac4dd3569e0c4547ced88" :type "md5"}]
:size 3073329
:fileId "271cd32c-2e86-4f46-9eb1-f3ddb44a6c1f"}))

24 changes: 16 additions & 8 deletions api/src/wfl/service/google/pubsub.clj
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,17 @@

(defn set-iam-policy
"Set the IAM policy for the given Pub/Sub `resource` with the specified
`policy` bindings. A Pub/Sub `resource` is a topic, a subscription or a
snapshot."
[resource policy]
(http/post
(pubsub-url resource ":setIamPolicy")
{:headers (auth/get-auth-header)
:content-type :json
:body (json/write-str {:policy policy} :escape-slash false)}))

(defn add-iam-policy
"Augment the IAM policy for the given Pub/Sub `resource` with the specified
role->member bindings. A Pub/Sub `resource` is a topic, a subscription or a
snapshot.

Expand All @@ -173,11 +184,8 @@
role->members - map from a Google Cloud Pub/Sub role to a list of members.
See https://cloud.google.com/pubsub/docs/access-control#permissions_and_roles"
[resource role->members]
(letfn [(make-binding [[role members]] {:role role :members members})]
(http/post
(pubsub-url resource ":setIamPolicy")
{:headers (auth/get-auth-header)
:content-type :json
:body (json/write-str
{:policy {:bindings (map make-binding role->members)}}
:escape-slash false)})))
(letfn [(make-binding [[role members]] [{:role role :members members}])]
(set-iam-policy
resource
(-> (get-iam-policy resource)
(update :bindings #(concat % (map make-binding role->members)))))))
35 changes: 24 additions & 11 deletions api/src/wfl/service/google/storage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,21 @@
(with-out-str (pprint object))]))))
(gs-url bucket name))

(defn iam
"Return IamPolicy response for URL."
[url]
(let [[bucket _] (parse-gs-url url)]
(-> {:method :get ; :debug true :debug-body true
:url (str bucket-url bucket "/iam")
;; :query-params {:project project :prefix prefix}
:content-type :application/json
:headers (auth/get-auth-header)}
http/request :body
(json/read-str :key-fn keyword))))
(defn get-iam-policy
"Return IamPolicy response for `bucket`."
[bucket]
(-> (str bucket-url bucket "/iam")
(http/get {:headers (auth/get-auth-header)})
util/response-body-json))

(defn set-iam-policy
"Set IamPolicy `bucket` to `policy`."
[bucket policy]
(-> (str bucket-url bucket "/iam")
(http/put {:headers (auth/get-auth-header)
:content-type :application/json
:body (json/write-str policy :escape-slash false)})
util/response-body-json))

(defn list-buckets
"The buckets in PROJECT named with optional PREFIX."
Expand Down Expand Up @@ -242,6 +246,15 @@
([email url]
(apply add-object-reader email (parse-gs-url url))))

(defn add-storage-object-viewer
"Give service-account `email` the \"Storage Object Viewer\" role in `bucket`."
[email bucket]
(let [new-binding [{:role "roles/storage.objectViewer"
:members [(str "serviceAccount:" email)]}]]
(-> (get-iam-policy bucket)
(update :bindings cons new-binding)
(->> (set-iam-policy bucket)))))

(defn patch-bucket!
"Patch BUCKET in PROJECT with METADATA."
[project bucket metadata]
Expand Down
14 changes: 8 additions & 6 deletions api/src/wfl/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,11 @@
(defn parse-int [s] (do-or-nil (Integer/parseInt s)))
(defn parse-boolean [s] (do-or-nil (Boolean/valueOf s)))

(defn parse-json [^String object]
(defn parse-json
"Parse json `object` into keyword->object map recursively"
[^String object]
(json/read-str object :key-fn keyword))

(defn parse-json
"Parse the json string STR into a keyword-string map"
[str]
(json/read-str str :key-fn keyword))

(defn response-body-json [response]
"Return the :body of the http `response` as JSON"
(-> response :body (or "null") parse-json))
Expand Down Expand Up @@ -82,6 +78,12 @@
(subs filename 0 idx)
filename))

(defn extension
"Return the (last) file extension from `filename`, if one exists."
[filename]
(if-let [idx (str/last-index-of filename ".")]
(subs filename (inc idx) (count filename))))

(defn basename
"Strip directory from `filename`."
[filename]
Expand Down
Loading