Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

[GH-1604] Add error handling to Slack service #584

Merged
merged 9 commits into from
Mar 2, 2022
5 changes: 5 additions & 0 deletions .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,9 @@ jobs:
env:
WFL_POSTGRES_USERNAME: postgres
WFL_POSTGRES_PASSWORD: postgres
#
# Assign env variables from Atlantis-managed GitHub Secrets:
# https://github.com/broadinstitute/terraform-ap-deployments/blob/master/github/tfvars/broadinstitute-wfl.tfvars
#
WFL_SLACK_TOKEN: ${{ secrets.WFL_SLACK_TOKEN }}
run: make ${MODULES} TARGET=integration
5 changes: 5 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ jobs:
env:
WFL_POSTGRES_USERNAME: postgres
WFL_POSTGRES_PASSWORD: postgres
#
# Assign env variables from Atlantis-managed GitHub Secrets:
# https://github.com/broadinstitute/terraform-ap-deployments/blob/master/github/tfvars/broadinstitute-wfl.tfvars
#
WFL_SLACK_TOKEN: ${{ secrets.WFL_SLACK_TOKEN }}
run: USER='Automated Release Action' make ${MODULES} TARGET=integration

- name: Images
Expand Down
8 changes: 5 additions & 3 deletions api/src/wfl/environment.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
(ns wfl.environment
"Map environment to various values here."
(:require [clojure.data.json :as json]
[clojure.string :as str]
[wfl.log :as log]
[clojure.string :as str]
[wfl.log :as log]
[vault.client.http] ; vault.core needs this
[vault.core :as vault]))
[vault.core :as vault]))

(declare getenv)

Expand Down Expand Up @@ -57,6 +57,8 @@
;; all other values disable them.
"WFL_SLACK_ENABLED"
(fn [] "disabled")
"WFL_SLACK_TOKEN"
#(-> "secret/dsde/gotc/dev/wfl/slack" vault-secrets :bot-user-token)

;; -- variables used in test code below this line --
"WFL_CROMWELL_URL"
Expand Down
2 changes: 1 addition & 1 deletion api/src/wfl/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,5 @@
(let [port (util/is-non-negative! (first args))
manager (start-workload-manager)
logger (start-logging-polling)
slacker (slack/start-notification-loop slack/notifier)]
slacker (slack/start-notification-loop)]
(await-some manager logger slacker (start-webserver port))))
113 changes: 60 additions & 53 deletions api/src/wfl/service/slack.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,11 @@
[wfl.api.workloads :as workloads])
(:import [clojure.lang PersistentQueue]))

;; Slack Bot User token obtained from Vault
(defonce ^:private token
(delay (:bot-user-token
(#'env/vault-secrets "secret/dsde/gotc/dev/wfl/slack"))))

(def enabled-env-var-name "WFL_SLACK_ENABLED")

;; FIXME: suppress warning `javax.mail.internet.AddressException: Missing final '@domain'`
;;
(defn ^:private valid-channel-id?
[channel-id]
(and (not (util/email-address? channel-id))
(str/starts-with? channel-id "C")))
(str/starts-with? channel-id "C"))

(defn slack-channel-watcher? [s]
(when-let [[tag value & _] s]
Expand All @@ -38,52 +30,63 @@
[url description]
(format "<%s|%s>" url description))

(defn ^:private slack-api-raise-for-status
"Slack API has its own way of reporting
statuses, so we need to parse the `body`
to raise for status."
[body]
(let [response (json/read-str body)]
(when-not (response "ok")
(throw (ex-info "failed to notify via Slack"
{:error (response "error")})))
;; More information on the meaning of error responses:
;; https://api.slack.com/methods/chat.postMessage#errors
;;
(defn ^:private post-message
"Post `message` to `channel`."
[channel message]
(let [headers {:Authorization (str "Bearer " (env/getenv "WFL_SLACK_TOKEN"))}
body (json/write-str {:channel channel :text message})]
(-> "https://slack.com/api/chat.postMessage"
(http/post {:headers headers
:content-type :application/json
:body body})
util/response-body-json)))

;; Slack API has its own way of reporting statuses:
;; https://api.slack.com/web#slack-web-api__evaluating-responses
;;
(defn ^:private post-message-or-throw
"Post `message` to `channel` and throw if response indicates a failure."
[channel message]
{:pre [(valid-channel-id? channel)]}
(let [response (post-message channel message)]
(when-not (:ok response)
(throw (ex-info "Slack API chat.postMessage failed"
{:channel channel
:message message
:response response})))
response))

(defn post-message
"Post message to channel and pass response to callback."
([channel message]
{:pre [(valid-channel-id? channel)]}
(-> "https://slack.com/api/chat.postMessage"
(http/post {:headers {:Authorization (str "Bearer " @token)}
:content-type :application/json
:body (json/write-str {:channel channel
:text message})})
:body
slack-api-raise-for-status))
([channel message callback]
(callback (post-message channel message))))
(def ^:private notifier (agent (PersistentQueue/EMPTY)))

;; Create the agent queue and attach a watcher
;;
(def notifier (agent (PersistentQueue/EMPTY)))
(add-watch notifier :watcher
(fn [_key _ref _old-state new-state]
(when-let [queue (seq new-state)]
(log/debug "Current notification queue" :queue queue))))
(defn ^:private log-notifier-queue-content
[_key _ref _old-state new-state]
(when-let [queue (seq new-state)]
(log/debug "Slack queue" :queue queue)))

(add-watch notifier log-notifier-queue-content log-notifier-queue-content)

(defn add-notification
"Add notification defined by a map of
`channel` and `message` to `agent` queue."
[agent {:keys [channel message]}]
(defn ^:private queue-notification
"Add notification of `message` for `channel` to `notifier`."
[{:keys [channel message] :as _notification}]
{:pre [(valid-channel-id? channel)]}
(send agent #(conj % {:channel channel :message message})))
(send notifier conj {:channel channel :message message}))

(defn send-notification
(defn dispatch-notification
"Dispatch the next notification in `queue`."
[queue]
(if (seq queue)
(let [{:keys [channel message]} (peek queue)]
(post-message channel message)
(pop queue))
(if-let [{:keys [channel message] :as notification} (peek queue)]
(let [popped (pop queue)]
(try
(post-message-or-throw channel message)
popped
(catch Throwable throwable
(log/error "post-message-or-throw threw"
:notification notification
:throwable throwable)
(conj popped notification))))
queue))

;; FIXME: add permission checks for slack-channel-watchers
Expand All @@ -104,7 +107,7 @@
(log/info "About to Slack"
:workload (workloads/to-log workload)
:payload (pr-str payload))
(add-notification notifier payload)))]
(queue-notification payload)))]
(run! notify channels)))
(log/info "Slack disabled"
:workload (workloads/to-log workload)
Expand All @@ -115,7 +118,11 @@
(defn start-notification-loop
"Return a future that listens at `agent` and
sends out notifications."
[agent]
(future (while true
(send-off agent send-notification)
(util/sleep-seconds 1))))
[]
(future
(while true
(try
(send-off notifier dispatch-notification)
(catch Throwable throwable
(log/error "dispatch-notification threw" :throwable throwable)))
(util/sleep-seconds 1))))
61 changes: 24 additions & 37 deletions api/test/wfl/integration/slack_test.clj
Original file line number Diff line number Diff line change
@@ -1,41 +1,28 @@
(ns wfl.integration.slack-test
(:require [clojure.test :refer [deftest is testing]]
[wfl.service.slack :as slack]
[wfl.util :as util])
(:require [clojure.test :refer [deftest is testing]]
[wfl.service.slack :as slack]
[wfl.tools.workloads :as workloads]
[wfl.util :as util])
(:import [clojure.lang PersistentQueue]))

(def ^:private testing-agent (agent (PersistentQueue/EMPTY)))
(def ^:private testing-slack-channel "C026PTM4XPA")
(defn ^:private testing-slack-notification []
{:channel testing-slack-channel
:message (format "WFL Integration Test Message: %s" (util/utc-now))})
(defn ^:private make-notification
[fn-name]
{:channel "C026PTM4XPA"
:message (format "%s `%s/%s`: %s"
@workloads/project *ns* fn-name (util/utc-now))})

(def ^:private notify-promise (promise))

(defn ^:private mock-send-notification
[queue]
(if (seq queue)
(let [{:keys [channel message]} (peek queue)
callback #(if (% "ok")
(deliver notify-promise true)
(deliver notify-promise false))]
(slack/post-message channel message callback)
(pop queue))
queue))

(add-watch
testing-agent :watcher
(fn [_key _ref _old-state new-state]
(when (seq new-state)
(testing "notification is added to agent"
(is (= (:channel (first (seq new-state))) testing-slack-channel))))))

;; This is test is flaky on Github Actions but works fine locally
;;
(deftest ^:kaocha/pending test-send-notification-to-a-slack-channel
(with-redefs-fn {#'slack/send-notification mock-send-notification}
#(do (slack/add-notification testing-agent (testing-slack-notification))
(send-off testing-agent #'slack/send-notification)
(testing "notification can actually be sent to slack"
(is (true? (deref notify-promise 10000 :timeout))
"Waited 10s for notification.")))))
(deftest test-dispatch-notification
(let [notification (make-notification 'test-dispatch-notification)
notifier (agent (PersistentQueue/EMPTY))
posted (promise)]
(letfn [(mock [channel message]
(deliver posted (#'slack/post-message channel message)))]
(with-redefs [slack/notifier notifier
slack/post-message-or-throw mock]
(#'slack/queue-notification notification)
(send-off notifier #'slack/dispatch-notification)
(testing "send a notification to slack"
(let [{:keys [channel ok] :as result} (deref posted 10000 ::timeout)]
(is (not= ::timeout result))
(is (true? ok))
(is (= (:channel notification) channel))))))))
51 changes: 32 additions & 19 deletions api/test/wfl/unit/slack_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,50 @@
[wfl.tools.fixtures :as fixtures])
(:import [clojure.lang PersistentQueue]))

(def ^:private testing-agent (agent (PersistentQueue/EMPTY)))
(defn ^:private testing-slack-notification []
{:channel "C000BOGUS00" :message "WFL Test Message"})
(defn ^:private make-notification [tag]
{:channel "C000BOGUS00" :message (str "WFL Test Message " tag)})

(deftest test-add-notification
(testing "notification can be added to the agent queue properly"
(let [num-msg 10]
(dotimes [_ num-msg]
(slack/add-notification testing-agent (testing-slack-notification)))
(await testing-agent)
(is (= num-msg (count (seq @testing-agent)))))))
(let [message-count 10
notifier (agent (PersistentQueue/EMPTY))]
(with-redefs [slack/notifier notifier]
(dotimes [n message-count]
(#'slack/queue-notification (make-notification n))))
(is (await-for (* 9 message-count 1000) notifier))
(is (= message-count (count (seq @notifier)))))))

(deftest test-notify-watchers-only-when-enabled
(let [notification (testing-slack-notification)
(deftest test-notify-only-when-enabled
(let [notification (make-notification 'test-notify-only-when-enabled)
workload {:uuid "workload-uuid"
:watchers [["slack" (:channel notification)]]}
message (:message notification)]
(letfn [(mock-add-notification [maybe-enabled]
(fn [_notifier payload]
(if (= "enabled" maybe-enabled)
(do (is (= (:channel notification) (:channel payload)))
(is (str/includes? (:message payload) message)))
(letfn [(mock [maybe-enabled]
(fn [payload]
(when-not (= "enabled" maybe-enabled)
(throw (ex-info "Should not notify"
{:maybe-enabled maybe-enabled})))))
{:maybe-enabled maybe-enabled})))
(is (= (:channel notification) (:channel payload)))
(is (str/includes? (:message payload) message))))
(verify [maybe-enabled]
(fixtures/with-temporary-environment
{slack/enabled-env-var-name maybe-enabled}
#(with-redefs-fn
{#'slack/add-notification (mock-add-notification maybe-enabled)}
(fn [] (slack/notify-watchers workload message)))))]
#(with-redefs
[slack/queue-notification (mock maybe-enabled)]
(slack/notify-watchers workload message))))]
(testing "notifications emitted when feature enabled"
(verify "enabled"))
(testing "notifications not emitted when feature disabled"
(verify "any-other-value-disables-slacking")))))

(deftest test-dispatch-does-not-throw
(let [queue (conj (PersistentQueue/EMPTY)
(make-notification 'test-dispatch-does-not-throw))]
(with-redefs
[slack/post-message #({:ok false :error "something_bad"})]
(is (= (seq queue) (seq (slack/dispatch-notification queue)))
"Queue should remain when posting Slack message returns error"))
(with-redefs
[slack/post-message #(throw (ex-info "Unexpected throwable" {}))]
(is (= (seq queue) (seq (slack/dispatch-notification queue)))
"Queue should remain when posting Slack message throws"))))
17 changes: 17 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,20 @@ in that workload running WGS reprocessing; a workload could also be a queue of
incoming notifications that describe all of the required inputs to launch Arrays
scientific pipelines in Cromwell.

## GitHub Secrets from Vault

When we need to access a Vault secret within GitHub Actions
(ex. within integration test runs), we should propagate it to a
[Github Secret](https://github.com/broadinstitute/wfl/settings/secrets/actions)
managed by Atlantis -- DSP's Terraform deployment server.
The GitHub Secret should then be passed to the action
as an environment variable rather than Vault being accessed
directly, an operation which could risk leaking secrets publicly.

To view or maintain WFL's Atlantis-managed Github Secrets, see
[terraform-ap-deployments](https://github.com/broadinstitute/terraform-ap-deployments/blob/master/github/tfvars/broadinstitute-wfl.tfvars)
repository.

More Information: ["Moving Vault secrets to Github via Atlantis"](https://docs.google.com/document/d/1JbjV4xjAlSOuZY-2bInatl4av3M-y_LmHQkLYyISYns/edit?usp=sharing)

Questions: [#dsp-devops-champions](https://broadinstitute.slack.com/archives/CADM7MZ35)