Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GH-1678] When monitoring source load tags, match prefix instead of exact match. #614

Merged
merged 7 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions api/src/wfl/api/handlers.clj
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
(ns wfl.api.handlers
"Define handlers for API endpoints."
(:require [clojure.set :refer [rename-keys]]
[wfl.wfl :as wfl]
[ring.util.http-response :as response]
[wfl.api.workloads :as workloads]
[wfl.configuration :as config]
[wfl.jdbc :as jdbc]
[wfl.log :as log]
[wfl.module.aou :as aou]
[wfl.service.google.storage :as gcs]
[wfl.service.postgres :as postgres]
[wfl.util :as util])
(:require [clojure.set :refer [rename-keys]]
[wfl.wfl :as wfl]
[ring.util.http-response :as response]
[wfl.api.workloads :as workloads]
[wfl.configuration :as config]
[wfl.executor :as executor]
[wfl.jdbc :as jdbc]
[wfl.log :as log]
[wfl.module.aou :as aou]
[wfl.service.google.storage :as gcs]
[wfl.service.postgres :as postgres]
[wfl.util :as util])
(:import [wfl.util UserException]))

(defn succeed
Expand Down Expand Up @@ -95,7 +96,7 @@
(let [uuid (get-in request [:path-params :uuid])
filters (-> request
(get-in [:parameters :query])
(select-keys [:submission :status]))]
(select-keys executor/workflow-filter-keys))]
(jdbc/with-db-transaction [tx (postgres/wfl-db-config)]
(->> (let [workload (workloads/load-workload-for-uuid tx uuid)]
(if (empty? filters)
Expand All @@ -104,8 +105,7 @@
(mapv util/to-edn)
succeed))))

;; Visible for testing
(def retry-no-workflows-error-message
(def ^:private retry-no-workflows-error-message
"No workflows to retry for workload and requested workflow filters.")

(defn post-retry
Expand Down
33 changes: 18 additions & 15 deletions api/src/wfl/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -547,27 +547,30 @@
;; not already an active workflow at the `HEAD`.
;;
;; ** Admittedly, this is still a problem with this design.
(defn ^:private terra-executor-workflows-sql-params
"Return sql and params that query `details` for non-retried workflows
matching `submission` and/or `status` if specified."
[{:keys [details] :as _executor} {:keys [submission status] :as _filters}]
(let [query (util/remove-empty-and-join
["SELECT * FROM %s"
"WHERE workflow IS NOT NULL"
"AND retry IS NULL"
(when submission "AND submission = ?")
(when status "AND status = ?")
"ORDER BY id ASC"])]
(->> [submission status]
(concat [(format query details)])
(remove nil?))))

(def workflow-filter-keys
"The workflow filter keys ordered for efficient queries."
[:submission :status])

(defn ^:private filter-query-for-unretried-workflows
"A query for un-retried workflows in `executor` matching `filters`."
[executor filters]
(letfn [(where [m k] (conj m (str/join \space ["AND" (name k) "= ?"])))
(query [{:keys [q p]}] (into [(str/join \space q)] p))]
(-> (fn [m k] (if-let [v (k filters)]
(-> m (update :q where k) (update :p conj v)) m))
(reduce {:q ["SELECT * FROM" (:details executor)
"WHERE workflow IS NOT NULL AND retry IS NULL"]
:p []} workflow-filter-keys)
(update :q conj "ORDER BY id ASC")
query)))

(defn ^:private terra-executor-workflows
"Return all the non-retried workflows executed by `executor`
matching specified `filters`."
[tx {:keys [details] :as executor} filters]
(postgres/throw-unless-table-exists tx details)
(let [sql-params (terra-executor-workflows-sql-params executor filters)
(let [sql-params (filter-query-for-unretried-workflows executor filters)
executor-subset (select-keys executor [:workspace :methodConfiguration])]
(map #(merge executor-subset %) (jdbc/query tx sql-params))))

Expand Down
14 changes: 5 additions & 9 deletions api/src/wfl/service/datarepo.clj
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,10 @@
(query-table-impl dataset table
(util/to-comma-separated-list (map name columns)))))

;; Public for testing
(def metadata-table-name-prefix "datarepo_row_metadata_")

(defn metadata
(defn ^:private metadata
"Return TDR row metadata table name corresponding to `table-name`."
[table-name]
(format "%s%s" metadata-table-name-prefix table-name))
(str "datarepo_row_metadata_" table-name))

(defn ^:private query-metadata-table-impl
"Return `col-spec` from rows from TDR `dataset`.`table` metadata
Expand All @@ -325,12 +322,11 @@
(format "ingest_time BETWEEN '%s' AND '%s'"
start end))
where-load-tag (when loadTag
(format "load_tag = '%s'" loadTag))
(format "load_tag LIKE '%s%%'" loadTag))
where-clauses (util/remove-empty-and-join
[where-ingest-time where-load-tag] " AND ")
where (if-not (empty? where-clauses)
(format "WHERE %s" where-clauses)
"")]
where (if (empty? where-clauses) ""
(format "WHERE %s" where-clauses))]
(-> "SELECT %s FROM `%s` %s"
(format col-spec (bq-path dataset-or-snapshot meta-table) where)
(->> (bigquery/query-sync dataProject)))))
Expand Down
2 changes: 1 addition & 1 deletion api/test/wfl/integration/source_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@

(deftest test-create-tdr-source-with-invalid-dataset-metadata-table
(with-redefs
[datarepo/metadata-table-name-prefix "wrong-metadata-table-name-"]
[datarepo/metadata (partial str "wrong-metadata-table-name-")]
(is (thrown-with-msg?
UserException #"TDR row metadata table not found in BigQuery"
(source/datarepo-source-validate-request-or-throw
Expand Down
10 changes: 6 additions & 4 deletions api/test/wfl/system/v1_endpoint_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,13 @@
{:submission submission :status status-unretriable}]
filters-valid [{:submission submission}
{:submission submission :status status-retriable}]]
(run! (partial should-throw-400
executor/terra-executor-retry-filters-invalid-error-message)
(run! (partial
should-throw-400
executor/terra-executor-retry-filters-invalid-error-message)
filters-invalid)
(run! (partial should-throw-400
handlers/retry-no-workflows-error-message)
(run! (partial
should-throw-400
@#'handlers/retry-no-workflows-error-message)
filters-valid))))
(testing "/start staged workload"
(let [{:keys [created started] :as response}
Expand Down
10 changes: 5 additions & 5 deletions api/test/wfl/unit/executor_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,35 @@
(:import [java.util UUID]
[wfl.util UserException]))

(deftest test-terra-executor-workflows-sql-params
(deftest test-filter-query-for-unretried-workflows
(let [executor {:details "TerraExecutor_00000001"}
submission (str (UUID/randomUUID))
status "Failed"
filters {:submission submission :status status}]
(letfn [(arg-count [sql] (-> sql frequencies (get \? 0)))]
(testing "No filters"
(let [[sql & params] (#'executor/terra-executor-workflows-sql-params
(let [[sql & params] (#'executor/filter-query-for-unretried-workflows
executor {})]
(is (== 0 (arg-count sql))
"No filters should yield no query arguments")
(is (empty? params)
"No filters should yield no parameters")))
(testing "Submission filter only"
(let [[sql & params] (#'executor/terra-executor-workflows-sql-params
(let [[sql & params] (#'executor/filter-query-for-unretried-workflows
executor (select-keys filters [:submission]))]
(is (== 1 (arg-count sql))
"Single filter (submission) should yield 1 query argument")
(is (= [submission] params)
"Submission filter should yield lone submission parameter")))
(testing "Status filter only"
(let [[sql & params] (#'executor/terra-executor-workflows-sql-params
(let [[sql & params] (#'executor/filter-query-for-unretried-workflows
executor (select-keys filters [:status]))]
(is (== 1 (arg-count sql))
"Single filter (status) should yield 1 query argument")
(is (= [status] params)
"Status filter should yield lone status parameter")))
(testing "Submission and status filters"
(let [[sql & params] (#'executor/terra-executor-workflows-sql-params
(let [[sql & params] (#'executor/filter-query-for-unretried-workflows
executor (select-keys filters [:submission :status]))]
(is (== 2 (arg-count sql))
"Two filters (submission and status) should yield 2 query arguments")
Expand Down
37 changes: 27 additions & 10 deletions docs/md/staged-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ like:
```
The table below summarises the purpose of each attribute in the above request.

| Attribute | Description |
|--------------------------|----------------------------------------------------------|
| `name` | Selects the `Terra DataRepo` source implementation. |
| `dataset` | The `UUID` of dataset to monitor and read from. |
| `table` | The name of the `dataset` table to monitor and read from.|
| `snapshotReaders` | A list of email addresses to set as `readers` of all snapshots created in this workload.|
| `pollingIntervalMinutes` | Optional. Rate at which WFL will poll TDR for new rows to snapshot.|
| `loadTag` | Optional. Only snapshot new rows ingested to TDR with `loadTag`.|
| Attribute | Description |
|--------------------------|------------------------------------------------------------------------------------------|
| `name` | Selects the `Terra DataRepo` source implementation. |
| `dataset` | The `UUID` of dataset to monitor and read from. |
| `table` | The name of the `dataset` table to monitor and read from. |
| `snapshotReaders` | A list of email addresses to set as `readers` of all snapshots created in this workload. |
| `pollingIntervalMinutes` | Optional. Rate at which WFL will poll TDR for new rows to snapshot. |
| `loadTag` | Optional. Snapshot only new rows ingested with `loadTag`. |

#### `dataset`

Expand Down Expand Up @@ -84,8 +84,25 @@ If not provided, the default interval is 20 minutes.

#### Optional `loadTag`

WFL will only snapshot new rows ingested to TDR with `loadTag`.
If not provided, all new rows will be eligible for snapshotting.
Specifying a `loadTag`
restricts the set of new rows
that WFL will include in a snapshot.
The `loadTag` specifies
a prefix to be matched
against the `loadTag` used
to ingest data to TDR.
Thus `my-ingest-tag` matches
all of the following load tags.

- `my-ingest-tag`
- `my-ingest-tag-`
- `my-ingest-tag-0`
- `my-ingest-tag-a`
- `my-ingest-tag-2022-07-14

When not provided,
all new rows will be eligible
for snapshotting.

!!! note
When initiating a TDR ingest, one can optionally set a load tag.
Expand Down