Skip to content

Commit

Permalink
CMR-10140-1: Adding subscribing to the local and AWS topics and publi…
Browse files Browse the repository at this point in the history
…shing messages.
  • Loading branch information
eereiter committed Oct 30, 2024
1 parent 170ed1b commit b5be135
Show file tree
Hide file tree
Showing 19 changed files with 904 additions and 261 deletions.
8 changes: 6 additions & 2 deletions common-app-lib/src/cmr/common_app/services/cache_info.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
[cmr.common.jobs :refer [defjob]]
[cmr.common.log :refer [debug error]]))

;; This is the cache size map validations. Cache keys can either be keywords
;; or strings.
(s/def ::cache-size-map
(s/and map?
#(every? keyword? (keys %))
#(every? number? (vals %))))
(fn [m]
(every? #(or keyword? %
string? %) (keys m)))
#(every? number? (vals %))))

(defn human-readable-bytes
[size]
Expand Down
9 changes: 4 additions & 5 deletions message-queue-lib/src/cmr/message_queue/pub_sub.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
(defn create-topic
"Create a topic using the given topic configuration. The type is determined
by the environment variable CMR_QUEUE_TYPE."
[]
(let [create-fn (case (config/queue-type)
"memory" local-topic/setup-topic nil
"aws" aws-topic/setup-topic (config/cmr-internal-subscriptions-topic-name))]
(create-fn)))
[sns-name]
(case (config/queue-type)
"memory" (local-topic/setup-topic)
"aws" (aws-topic/setup-topic sns-name)))
17 changes: 14 additions & 3 deletions message-queue-lib/src/cmr/message_queue/queue/aws_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
(software.amazon.awssdk.services.sqs.model CreateQueueResponse)
(software.amazon.awssdk.services.sqs.model DeleteMessageRequest)
(software.amazon.awssdk.services.sqs.model DeleteQueueRequest)
(software.amazon.awssdk.services.sqs.model GetQueueAttributesRequest)
(software.amazon.awssdk.services.sqs.model MessageAttributeValue)
(software.amazon.awssdk.services.sqs.model QueueAttributeName)
(software.amazon.awssdk.services.sqs.model ReceiveMessageRequest)
(software.amazon.awssdk.services.sqs.model ReceiveMessageResponse)
(software.amazon.awssdk.services.sqs.model SendMessageRequest)
Expand Down Expand Up @@ -145,11 +147,21 @@
queue-url
(.getMessage e))))))

(defn get-queue-arn
"Gets the SQS ARN value from the queue. We need this value to subscribe the queue to an SNS topic."
[sqs-client queue-url]
(let [sqs-request (-> (GetQueueAttributesRequest/builder)
(.queueUrl queue-url)
(.attributeNames [QueueAttributeName/QUEUE_ARN])
(.build))
response (.getQueueAttributes sqs-client sqs-request)]
(get (.attributesAsStrings response) "QueueArn")))

(comment

(let [sqs-client (create-sqs-client (cmr.message-queue.config/sqs-server-url))
queue-url (create-queue sqs-client (cmr.message-queue.config/cmr-internal-subscriptions-queue-name))
message-attributes (attributes-builder {"collection-concept-id" "C12345-PROV1"})
message-attributes (attributes-builder {"collection-concept-id" "C1200000065-PROV1"})
message "A test message"
_ (publish sqs-client queue-url message message-attributes)
messages (receive-messages sqs-client queue-url)]
Expand All @@ -158,5 +170,4 @@
(println (.receiptHandle %))
(println (.messageAttributes %)))
messages)
(delete-messages sqs-client queue-url messages))
)
(delete-messages sqs-client queue-url messages)))
3 changes: 1 addition & 2 deletions message-queue-lib/src/cmr/message_queue/services/queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
(Thread/sleep (config/messaging-retry-delay))
(recur queue-broker exchange-name msg)))

(declare publish-message)
(declare queue-broker exchange-name msg)
(declare publish-message queue-broker exchange-name msg)
(defn-timed publish-message
"Publishes a message to an exchange Throws a service unavailable error if the message
fails to be put on the queue.
Expand Down
91 changes: 84 additions & 7 deletions message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
(ns cmr.message-queue.topic.aws-topic
"Defines an AWS implementation of the topic protocol."
(:require
[cheshire.core :as json]
[cmr.common.dev.record-pretty-printer :as record-pretty-printer]
[cmr.common.log :refer [error]]
[cmr.common.util :as util]
[cmr.message-queue.config :as config]
[cmr.message-queue.queue.aws-queue :as aws-queue]
[cmr.message-queue.topic.topic-protocol :as topic-protocol])
(:import
(software.amazon.awssdk.regions Region)
(software.amazon.awssdk.services.sns SnsClient)
(software.amazon.awssdk.services.sns.model CreateTopicRequest)
(software.amazon.awssdk.services.sns.model CreateTopicResponse)
(software.amazon.awssdk.services.sns.model MessageAttributeValue)
(software.amazon.awssdk.services.sns.model PublishRequest)))
(software.amazon.awssdk.services.sns.model PublishRequest)
(software.amazon.awssdk.services.sns.model SetSubscriptionAttributesRequest)
(software.amazon.awssdk.services.sns.model SubscribeRequest)
(software.amazon.awssdk.services.sns.model UnsubscribeRequest)))

(defn attribute-builder
"Create an AWS attribute based on the passed in value to use when
Expand Down Expand Up @@ -47,26 +54,93 @@
(recur (rest new-keys) (conj result {attr-key attr-value})))
result)))))

(defn subscribe-sqs-to-sns
"Subscribes an AWS SQS to an AWS Topic."
[sns-client topic-arn sqs-arn]
(let [sub-request (-> (SubscribeRequest/builder)
(.protocol "sqs")
(.endpoint sqs-arn)
(.returnSubscriptionArn true)
(.topicArn topic-arn)
(.build))
response (.subscribe sns-client sub-request)]
(.subscriptionArn response)))

(defn set-filter-policy
"For a given subscription set the filter policy so that the queue
only gets the notificiation messages that it wants. The passed in
filter policy is a hash map - for example:
{\"collection-concept-id\": \"C12345-PROV1\"
\"mode\": [\"New\", \"Update\"]}"
[sns-client subscription-arn subscription]
;; Turn the clojure filter policy to json
(when (or (:CollectionConceptId subscription)
(:Mode subscription))
(let [filters (util/remove-nil-keys
{:collection-concept-id (:CollectionConceptId subscription)
:mode (:Mode subscription)})
filter-json (json/generate-string filters)
sub-filter-request (-> (SetSubscriptionAttributesRequest/builder)
(.subscriptionArn subscription-arn)
(.attributeName "FilterPolicy")
(.attributeValue filter-json)
(.build))]
(.setSubscriptionAttributes sns-client sub-filter-request))))

(defn set-redrive-policy
"For a given subscription set the redrive-policy - which is a dead letter queue if the
message cannot be sent from the SNS to the subscribed endpoint."
[sns-client subscription-arn dead-letter-queue-arn]
(let [redrive-policy (str "{\"deadLetterTargetArn\": \"" dead-letter-queue-arn "\"}")
_ (println "redrive-policy:" redrive-policy)
sqs-request (-> (SetSubscriptionAttributesRequest/builder)
(.subscriptionArn subscription-arn)
(.attributeName "RedrivePolicy")
(.attributeValue redrive-policy)
(.build))]
(.setSubscriptionAttributes sns-client sqs-request)))

(defrecord
AWSTopic
[;; A record containing fields related to accessing SNS topics.
;; Connection to AWS SNS
^SnsClient sns-client
;; The endpoint of the topic to send messages to. For AWS it is the topic ARN,
;; for the in memory implementation it is nil.
topic-arn]
topic-arn

subscription-dead-letter-queue-arn]

;; This will be filled in next sprint. CMR-10141
topic-protocol/Topic
(subscribe
[_this _subscription])
[_this subscription]
(try
(let [subscription-arn (subscribe-sqs-to-sns sns-client topic-arn (get-in subscription [:metadata :EndPoint]))]
(when subscription-arn
(set-filter-policy sns-client subscription-arn subscription)
(set-redrive-policy sns-client subscription-arn subscription-dead-letter-queue-arn))
subscription-arn)
(catch Exception e
(error (format "Exception caught trying to subscribe the queue %s to the %s SNS Topic. Exception: %s"
(:EndPoint subscription)
topic-arn
(.getMessage e))))))

(unsubscribe
[_this subscription-id]
(let [sub-request (-> (UnsubscribeRequest/builder)
(.subscriptionArn (:subscription-arn subscription-id))
(.build))]
(.unsubscribe sns-client sub-request))
(:subscription-arn subscription-id))

(publish
[_this message message-attributes]
[_this message message-attributes subject]
(let [msg-atts (attributes-builder message-attributes)
pub-request (-> (PublishRequest/builder)
(.message message)
(.subject (:subject message-attributes))
(.subject subject)
(.topicArn topic-arn)
(.messageAttributes msg-atts)
(.build))]
Expand Down Expand Up @@ -103,8 +177,11 @@
[sns-name]
(println "Setting up AWS-topic")
(let [sns-client (create-sns-client)
topic-arn (create-sns-topic sns-client sns-name)]
(->AWSTopic sns-client topic-arn)))
topic-arn (create-sns-topic sns-client sns-name)
sqs-client (aws-queue/create-sqs-client)
sub-dl-queue-url (aws-queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name))
sub-dl-queue-arn (aws-queue/get-queue-arn sqs-client sub-dl-queue-url)]
(->AWSTopic sns-client topic-arn sub-dl-queue-arn)))

(comment
(def topic (setup-topic "cmr-internal-subscriptions-sit"))
Expand Down
80 changes: 59 additions & 21 deletions message-queue-lib/src/cmr/message_queue/topic/local_topic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
msg-atts (queue/attributes-builder message-attributes)]
(try
(if filter
(when (and (= (:collection-concept-id message-attributes)
(:collection-concept-id filter))
(when (and (= (message-attributes "collection-concept-id")
(filter :collection-concept-id))
(or (nil? (:mode filter))
(some #(= (:mode message-attributes) %) (:mode filter))))
(queue/publish sqs-client queue-url message msg-atts))
(some #(= (message-attributes "mode") %) (:mode filter))))
(queue/publish sqs-client queue-url message msg-atts))
(queue/publish sqs-client queue-url message msg-atts))
(catch SqsException e
(info (format "Exception caught publishing message to %s. Exception: %s. Please check if queue exists. Send message to %s."
Expand All @@ -37,6 +37,24 @@
(.getMessage e)
dead-letter-queue-url))))))))

(defn infrastructure_setup?
"Check to see if the infrastructure has been setup"
[topic]
(seq @(:subscription-atom topic)))

(defn setup-infrastructure
"Set up the local CMR internal subscription queue and dead letter queue and
subscribe then to the passed in topic. This function assumes that elasticmq
is up and running, or that the tests will start one."
[topic]
(when-not (infrastructure_setup? topic)
(let [sqs-client (queue/create-sqs-client (config/sqs-server-url))
subscription {:sqs-client sqs-client
:queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-queue-name))
:dead-letter-queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-dead-letter-queue-name))}]
(queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name))
(swap! (:subscription-atom topic) conj subscription))))

(defrecord
LocalTopic
[;; An atom containing a list of subscriptions. A subscription is a map that
Expand All @@ -45,11 +63,44 @@

topic-protocol/Topic
(subscribe
[_this subscription]
(swap! subscription-atom conj subscription))
[this subscription]
;; to speed up development startup, the setup call is here and setup checks first to see if it is already setup.
;; Otherwise on startup the system would have to wait for the elasticmq to start before it could continue with setting
;; up the database slowing down all the tests.
(setup-infrastructure this)
(let [metadata (:metadata subscription)
sqs-client (queue/create-sqs-client (config/sqs-server-url))
sub {:sqs-client sqs-client
:filter (when (or (:CollectionConceptId metadata)
(:Mode metadata))
{:collection-concept-id (:CollectionConceptId metadata)
:mode (:Mode metadata)})
:queue-url (:EndPoint metadata)
:dead-letter-queue-url (queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name))
:concept-id (:concept-id subscription)}]
(if-not (seq (filter #(= (:concept-id %) (:concept-id subscription))
@subscription-atom))
(swap! subscription-atom conj sub)
(let [new-subs (filter #(not= (:concept-id %) (:concept-id subscription)) @subscription-atom)]
(reset! subscription-atom (conj new-subs sub))))
;; instead of the full subscription list, pass back the subscription concept id.
(:concept-id subscription)))

(unsubscribe
[_this subscription-id]
;; remove the subscription from the atom and send back the subscription id, not the atom contents.
(swap! subscription-atom (fn [subs]
(doall
(filter #(not= (:concept-id %) (:concept-id subscription-id))
subs))))
(:concept-id subscription-id))

(publish
[_this message message-attributes]
[this message message-attributes _subject]
;; to speed up development startup, the setup call is here and setup checks first to see if it is already setup.
;; Otherwise on startup the system would have to wait for the elasticmq to start before it could continue with setting
;; up the database slowing down all the tests.
(setup-infrastructure this)
(doall (map #(publish-message % message message-attributes) @subscription-atom)))

(health
Expand All @@ -63,21 +114,8 @@
[]
(->LocalTopic (atom '())))

(defn setup-infrastructure
"Set up the local CMR internal subscription queue and dead letter queue and
subscribe then to the passed in topic. This function assumes that elasticmq
is up and running, or that the tests will start one."
[topic]
(let [sqs-client (queue/create-sqs-client (config/sqs-server-url))
queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-queue-name))
dl-queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-dead-letter-queue-name))]
(topic-protocol/subscribe topic {:sqs-client sqs-client
:filter nil
:queue-url queue-url
:dead-letter-queue-url dl-queue-url})))

(comment
(def topic (setup-topic))
(def subscription (setup-infrastructure topic))
(topic-protocol/publish topic "test" {"test" "test"})
(topic-protocol/publish topic "test" {"test" "test"} "test")
(:subscription-atom topic))
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
[this subscription]
"Subscribes to the given topic.")

(unsubscribe
[this subscription-id]
"Unsubscribes to the given topic.")

(publish
[this message message-attributes]
[this message message-attributes subject]
"Publishes a message on the topic. Returns true if the message was
successful. Otherwise returns false.")

Expand Down
14 changes: 8 additions & 6 deletions message-queue-lib/test/cmr/message_queue/test/pub_sub_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,27 @@

(deftest subscribe-queue-to-topic
(when (= "memory" (config/queue-type))
(let [topic (pub-sub/create-topic)
subscription (first (local-topic/setup-infrastructure topic))
(let [topic (pub-sub/create-topic "sns-name")
_ (local-topic/setup-infrastructure topic)
subscription (first @(:subscription-atom topic))
message "test"
message-attributes {"collection-concept-id" "C12345-PROV1"
"mode" "New"}
"mode" "New"}
subject "A new granule"
sqs-client (:sqs-client subscription)
queue-url (:queue-url subscription)
dead-letter-queue-url (:dead-letter-queue-url subscription)]

(is (= (keys subscription) '(:sqs-client :filter :queue-url :dead-letter-queue-url)))
(is (some? (topic-protocol/publish topic message message-attributes)))
(is (= (keys subscription) '(:sqs-client :queue-url :dead-letter-queue-url)))
(is (some? (topic-protocol/publish topic message message-attributes subject)))

(when-let [messages (seq (queue/receive-messages sqs-client queue-url))]
(is (= message (.body (first messages))))
(is (some? (queue/delete-messages (queue/create-sqs-client (config/sqs-server-url)) queue-url messages))))

;; Delete the queue to test the publish sending to dead letter queue.
(queue/delete-queue sqs-client queue-url)
(is (some? (topic-protocol/publish topic message message-attributes)))
(is (some? (topic-protocol/publish topic message message-attributes subject)))
(when-let [messages (seq (queue/receive-messages sqs-client dead-letter-queue-url))]
(is (= message (.body (first messages))))
(is (some? (queue/delete-messages sqs-client dead-letter-queue-url messages)))))))
1 change: 1 addition & 0 deletions metadata-db-app/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
[org.clojars.gjahad/debug-repl "0.3.3"]
[org.clojure/tools.namespace "0.2.11"]
[nasa-cmr/cmr-common-lib "0.1.1-SNAPSHOT"]
[nasa-cmr/cmr-message-queue-lib "0.1.0-SNAPSHOT"]
[pjstadig/humane-test-output "0.9.0"]
[proto-repl "0.3.1"]]
:jvm-opts ^:replace ["-server"]
Expand Down
Loading

0 comments on commit b5be135

Please sign in to comment.