Skip to content

Commit

Permalink
CMR-10140: Adding subscribing to the local and AWS topics and publish…
Browse files Browse the repository at this point in the history
…ing messages. (#2185)

* CMR-10140-1: Adding subscribing to the local and AWS topics and publishing messages.

* CMR-10140-1: fixing database integration tests.

* CMR-10140-1: Fixing email subscription timing and notification time entry into the database.

* CMR-10140: Fixing test

* CMR-10140: Making the cmr_sub_notification last_notified_at nullable
  • Loading branch information
eereiter authored Nov 1, 2024
1 parent 170ed1b commit e7b2c99
Show file tree
Hide file tree
Showing 25 changed files with 965 additions and 303 deletions.
8 changes: 6 additions & 2 deletions common-app-lib/src/cmr/common_app/services/cache_info.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
[cmr.common.jobs :refer [defjob]]
[cmr.common.log :refer [debug error]]))

;; This is the cache size map validations. Cache keys can either be keywords
;; or strings.
(s/def ::cache-size-map
(s/and map?
#(every? keyword? (keys %))
#(every? number? (vals %))))
(fn [m]
(every? #(or keyword? %
string? %) (keys m)))
#(every? number? (vals %))))

(defn human-readable-bytes
[size]
Expand Down
16 changes: 11 additions & 5 deletions ingest-app/src/cmr/ingest/services/subscriptions_helper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,11 @@
#_{:clj-kondo/ignore [:unresolved-var]}
(defn- send-update-subscription-notification-time!
"Fires off an http call to update the time which the subscription last was processed"
[context sub-id]
(debug "send-update-subscription-notification-time with" sub-id)
(search/save-subscription-notification-time context sub-id))
[context sub-id last-notified-time]
(debug "send-update-subscription-notification-time with" sub-id )
(search/save-subscription-notification-time context sub-id last-notified-time))

#_{:clj-kondo/ignore [:unresolved-var]}
(defn- filter-concept-refs-by-subscriber-id
"Takes a list of concept references and a subscriber id and removes any concept that the user does
not have read access to."
Expand Down Expand Up @@ -266,12 +267,17 @@
Sent subscription email to [" email-address "].
\nSubscription email contents: [" email-content "]."))
(when update-notification-time?
(send-update-subscription-notification-time! context sub-id))
(send-update-subscription-notification-time! context sub-id (:end-time subscription)))
(catch Exception e
(error "Exception caught in email subscription: " sub-id "\n\n"
(.getMessage e) "\n\n" e))))))
subscriber-filtered-concept-refs-list))

(defn remove-ingest-subscriptions
"Remove ingest subscriptions since emails are not sent out for those."
[concept]
(:EndPoint (json/decode (:metadata concept) true)))

(defn email-subscription-processing
"Process email subscriptions and send email when found granules matching the collection and queries
in the subscription and were created/updated during the last processing interval."
Expand All @@ -280,7 +286,7 @@
([context revision-date-range]
(let [subscriptions (->> (mdb/find-concepts context {:latest true} :subscription)
(remove :deleted)
(remove #(:endpoint (:extra-fields %)))
(remove remove-ingest-subscriptions)
(map #(select-keys % [:concept-id :extra-fields :metadata])))]
(send-subscription-emails context (process-subscriptions context subscriptions revision-date-range) (nil? revision-date-range)))))

Expand Down
9 changes: 4 additions & 5 deletions message-queue-lib/src/cmr/message_queue/pub_sub.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
(defn create-topic
"Create a topic using the given topic configuration. The type is determined
by the environment variable CMR_QUEUE_TYPE."
[]
(let [create-fn (case (config/queue-type)
"memory" local-topic/setup-topic nil
"aws" aws-topic/setup-topic (config/cmr-internal-subscriptions-topic-name))]
(create-fn)))
[sns-name]
(case (config/queue-type)
"memory" (local-topic/setup-topic)
"aws" (aws-topic/setup-topic sns-name)))
17 changes: 14 additions & 3 deletions message-queue-lib/src/cmr/message_queue/queue/aws_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
(software.amazon.awssdk.services.sqs.model CreateQueueResponse)
(software.amazon.awssdk.services.sqs.model DeleteMessageRequest)
(software.amazon.awssdk.services.sqs.model DeleteQueueRequest)
(software.amazon.awssdk.services.sqs.model GetQueueAttributesRequest)
(software.amazon.awssdk.services.sqs.model MessageAttributeValue)
(software.amazon.awssdk.services.sqs.model QueueAttributeName)
(software.amazon.awssdk.services.sqs.model ReceiveMessageRequest)
(software.amazon.awssdk.services.sqs.model ReceiveMessageResponse)
(software.amazon.awssdk.services.sqs.model SendMessageRequest)
Expand Down Expand Up @@ -145,11 +147,21 @@
queue-url
(.getMessage e))))))

(defn get-queue-arn
"Gets the SQS ARN value from the queue. We need this value to subscribe the queue to an SNS topic."
[sqs-client queue-url]
(let [sqs-request (-> (GetQueueAttributesRequest/builder)
(.queueUrl queue-url)
(.attributeNames [QueueAttributeName/QUEUE_ARN])
(.build))
response (.getQueueAttributes sqs-client sqs-request)]
(get (.attributesAsStrings response) "QueueArn")))

(comment

(let [sqs-client (create-sqs-client (cmr.message-queue.config/sqs-server-url))
queue-url (create-queue sqs-client (cmr.message-queue.config/cmr-internal-subscriptions-queue-name))
message-attributes (attributes-builder {"collection-concept-id" "C12345-PROV1"})
message-attributes (attributes-builder {"collection-concept-id" "C1200000065-PROV1"})
message "A test message"
_ (publish sqs-client queue-url message message-attributes)
messages (receive-messages sqs-client queue-url)]
Expand All @@ -158,5 +170,4 @@
(println (.receiptHandle %))
(println (.messageAttributes %)))
messages)
(delete-messages sqs-client queue-url messages))
)
(delete-messages sqs-client queue-url messages)))
3 changes: 1 addition & 2 deletions message-queue-lib/src/cmr/message_queue/services/queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
(Thread/sleep (config/messaging-retry-delay))
(recur queue-broker exchange-name msg)))

(declare publish-message)
(declare queue-broker exchange-name msg)
(declare publish-message queue-broker exchange-name msg)
(defn-timed publish-message
"Publishes a message to an exchange Throws a service unavailable error if the message
fails to be put on the queue.
Expand Down
91 changes: 84 additions & 7 deletions message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
(ns cmr.message-queue.topic.aws-topic
"Defines an AWS implementation of the topic protocol."
(:require
[cheshire.core :as json]
[cmr.common.dev.record-pretty-printer :as record-pretty-printer]
[cmr.common.log :refer [error]]
[cmr.common.util :as util]
[cmr.message-queue.config :as config]
[cmr.message-queue.queue.aws-queue :as aws-queue]
[cmr.message-queue.topic.topic-protocol :as topic-protocol])
(:import
(software.amazon.awssdk.regions Region)
(software.amazon.awssdk.services.sns SnsClient)
(software.amazon.awssdk.services.sns.model CreateTopicRequest)
(software.amazon.awssdk.services.sns.model CreateTopicResponse)
(software.amazon.awssdk.services.sns.model MessageAttributeValue)
(software.amazon.awssdk.services.sns.model PublishRequest)))
(software.amazon.awssdk.services.sns.model PublishRequest)
(software.amazon.awssdk.services.sns.model SetSubscriptionAttributesRequest)
(software.amazon.awssdk.services.sns.model SubscribeRequest)
(software.amazon.awssdk.services.sns.model UnsubscribeRequest)))

(defn attribute-builder
"Create an AWS attribute based on the passed in value to use when
Expand Down Expand Up @@ -47,26 +54,93 @@
(recur (rest new-keys) (conj result {attr-key attr-value})))
result)))))

(defn subscribe-sqs-to-sns
"Subscribes an AWS SQS to an AWS Topic."
[sns-client topic-arn sqs-arn]
(let [sub-request (-> (SubscribeRequest/builder)
(.protocol "sqs")
(.endpoint sqs-arn)
(.returnSubscriptionArn true)
(.topicArn topic-arn)
(.build))
response (.subscribe sns-client sub-request)]
(.subscriptionArn response)))

(defn set-filter-policy
"For a given subscription set the filter policy so that the queue
only gets the notificiation messages that it wants. The passed in
filter policy is a hash map - for example:
{\"collection-concept-id\": \"C12345-PROV1\"
\"mode\": [\"New\", \"Update\"]}"
[sns-client subscription-arn subscription]
;; Turn the clojure filter policy to json
(when (or (:CollectionConceptId subscription)
(:Mode subscription))
(let [filters (util/remove-nil-keys
{:collection-concept-id (:CollectionConceptId subscription)
:mode (:Mode subscription)})
filter-json (json/generate-string filters)
sub-filter-request (-> (SetSubscriptionAttributesRequest/builder)
(.subscriptionArn subscription-arn)
(.attributeName "FilterPolicy")
(.attributeValue filter-json)
(.build))]
(.setSubscriptionAttributes sns-client sub-filter-request))))

(defn set-redrive-policy
"For a given subscription set the redrive-policy - which is a dead letter queue if the
message cannot be sent from the SNS to the subscribed endpoint."
[sns-client subscription-arn dead-letter-queue-arn]
(let [redrive-policy (str "{\"deadLetterTargetArn\": \"" dead-letter-queue-arn "\"}")
_ (println "redrive-policy:" redrive-policy)
sqs-request (-> (SetSubscriptionAttributesRequest/builder)
(.subscriptionArn subscription-arn)
(.attributeName "RedrivePolicy")
(.attributeValue redrive-policy)
(.build))]
(.setSubscriptionAttributes sns-client sqs-request)))

(defrecord
AWSTopic
[;; A record containing fields related to accessing SNS topics.
;; Connection to AWS SNS
^SnsClient sns-client
;; The endpoint of the topic to send messages to. For AWS it is the topic ARN,
;; for the in memory implementation it is nil.
topic-arn]
topic-arn

subscription-dead-letter-queue-arn]

;; This will be filled in next sprint. CMR-10141
topic-protocol/Topic
(subscribe
[_this _subscription])
[_this subscription]
(try
(let [subscription-arn (subscribe-sqs-to-sns sns-client topic-arn (get-in subscription [:metadata :EndPoint]))]
(when subscription-arn
(set-filter-policy sns-client subscription-arn subscription)
(set-redrive-policy sns-client subscription-arn subscription-dead-letter-queue-arn))
subscription-arn)
(catch Exception e
(error (format "Exception caught trying to subscribe the queue %s to the %s SNS Topic. Exception: %s"
(:EndPoint subscription)
topic-arn
(.getMessage e))))))

(unsubscribe
[_this subscription-id]
(let [sub-request (-> (UnsubscribeRequest/builder)
(.subscriptionArn (:subscription-arn subscription-id))
(.build))]
(.unsubscribe sns-client sub-request))
(:subscription-arn subscription-id))

(publish
[_this message message-attributes]
[_this message message-attributes subject]
(let [msg-atts (attributes-builder message-attributes)
pub-request (-> (PublishRequest/builder)
(.message message)
(.subject (:subject message-attributes))
(.subject subject)
(.topicArn topic-arn)
(.messageAttributes msg-atts)
(.build))]
Expand Down Expand Up @@ -103,8 +177,11 @@
[sns-name]
(println "Setting up AWS-topic")
(let [sns-client (create-sns-client)
topic-arn (create-sns-topic sns-client sns-name)]
(->AWSTopic sns-client topic-arn)))
topic-arn (create-sns-topic sns-client sns-name)
sqs-client (aws-queue/create-sqs-client)
sub-dl-queue-url (aws-queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name))
sub-dl-queue-arn (aws-queue/get-queue-arn sqs-client sub-dl-queue-url)]
(->AWSTopic sns-client topic-arn sub-dl-queue-arn)))

(comment
(def topic (setup-topic "cmr-internal-subscriptions-sit"))
Expand Down
80 changes: 59 additions & 21 deletions message-queue-lib/src/cmr/message_queue/topic/local_topic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
msg-atts (queue/attributes-builder message-attributes)]
(try
(if filter
(when (and (= (:collection-concept-id message-attributes)
(:collection-concept-id filter))
(when (and (= (message-attributes "collection-concept-id")
(filter :collection-concept-id))
(or (nil? (:mode filter))
(some #(= (:mode message-attributes) %) (:mode filter))))
(queue/publish sqs-client queue-url message msg-atts))
(some #(= (message-attributes "mode") %) (:mode filter))))
(queue/publish sqs-client queue-url message msg-atts))
(queue/publish sqs-client queue-url message msg-atts))
(catch SqsException e
(info (format "Exception caught publishing message to %s. Exception: %s. Please check if queue exists. Send message to %s."
Expand All @@ -37,6 +37,24 @@
(.getMessage e)
dead-letter-queue-url))))))))

(defn infrastructure_setup?
"Check to see if the infrastructure has been setup"
[topic]
(seq @(:subscription-atom topic)))

(defn setup-infrastructure
"Set up the local CMR internal subscription queue and dead letter queue and
subscribe then to the passed in topic. This function assumes that elasticmq
is up and running, or that the tests will start one."
[topic]
(when-not (infrastructure_setup? topic)
(let [sqs-client (queue/create-sqs-client (config/sqs-server-url))
subscription {:sqs-client sqs-client
:queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-queue-name))
:dead-letter-queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-dead-letter-queue-name))}]
(queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name))
(swap! (:subscription-atom topic) conj subscription))))

(defrecord
LocalTopic
[;; An atom containing a list of subscriptions. A subscription is a map that
Expand All @@ -45,11 +63,44 @@

topic-protocol/Topic
(subscribe
[_this subscription]
(swap! subscription-atom conj subscription))
[this subscription]
;; to speed up development startup, the setup call is here and setup checks first to see if it is already setup.
;; Otherwise on startup the system would have to wait for the elasticmq to start before it could continue with setting
;; up the database slowing down all the tests.
(setup-infrastructure this)
(let [metadata (:metadata subscription)
sqs-client (queue/create-sqs-client (config/sqs-server-url))
sub {:sqs-client sqs-client
:filter (when (or (:CollectionConceptId metadata)
(:Mode metadata))
{:collection-concept-id (:CollectionConceptId metadata)
:mode (:Mode metadata)})
:queue-url (:EndPoint metadata)
:dead-letter-queue-url (queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name))
:concept-id (:concept-id subscription)}]
(if-not (seq (filter #(= (:concept-id %) (:concept-id subscription))
@subscription-atom))
(swap! subscription-atom conj sub)
(let [new-subs (filter #(not= (:concept-id %) (:concept-id subscription)) @subscription-atom)]
(reset! subscription-atom (conj new-subs sub))))
;; instead of the full subscription list, pass back the subscription concept id.
(:concept-id subscription)))

(unsubscribe
[_this subscription-id]
;; remove the subscription from the atom and send back the subscription id, not the atom contents.
(swap! subscription-atom (fn [subs]
(doall
(filter #(not= (:concept-id %) (:concept-id subscription-id))
subs))))
(:concept-id subscription-id))

(publish
[_this message message-attributes]
[this message message-attributes _subject]
;; to speed up development startup, the setup call is here and setup checks first to see if it is already setup.
;; Otherwise on startup the system would have to wait for the elasticmq to start before it could continue with setting
;; up the database slowing down all the tests.
(setup-infrastructure this)
(doall (map #(publish-message % message message-attributes) @subscription-atom)))

(health
Expand All @@ -63,21 +114,8 @@
[]
(->LocalTopic (atom '())))

(defn setup-infrastructure
"Set up the local CMR internal subscription queue and dead letter queue and
subscribe then to the passed in topic. This function assumes that elasticmq
is up and running, or that the tests will start one."
[topic]
(let [sqs-client (queue/create-sqs-client (config/sqs-server-url))
queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-queue-name))
dl-queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-dead-letter-queue-name))]
(topic-protocol/subscribe topic {:sqs-client sqs-client
:filter nil
:queue-url queue-url
:dead-letter-queue-url dl-queue-url})))

(comment
(def topic (setup-topic))
(def subscription (setup-infrastructure topic))
(topic-protocol/publish topic "test" {"test" "test"})
(topic-protocol/publish topic "test" {"test" "test"} "test")
(:subscription-atom topic))
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
[this subscription]
"Subscribes to the given topic.")

(unsubscribe
[this subscription-id]
"Unsubscribes to the given topic.")

(publish
[this message message-attributes]
[this message message-attributes subject]
"Publishes a message on the topic. Returns true if the message was
successful. Otherwise returns false.")

Expand Down
Loading

0 comments on commit e7b2c99

Please sign in to comment.