From e7b2c997df48f1761ed59ada4aa144e2cbcfc171 Mon Sep 17 00:00:00 2001 From: eereiter Date: Fri, 1 Nov 2024 08:41:43 -0400 Subject: [PATCH] CMR-10140: Adding subscribing to the local and AWS topics and publishing messages. (#2185) * CMR-10140-1: Adding subscribing to the local and AWS topics and publishing messages. * CMR-10140-1: fixing database integration tests. * CMR-10140-1: Fixing email subscription timing and notification time entry into the database. * CMR-10140: Fixing test * CMR-10140: Making the cmr_sub_notification last_notified_at nullable --- .../cmr/common_app/services/cache_info.clj | 8 +- .../ingest/services/subscriptions_helper.clj | 16 +- .../src/cmr/message_queue/pub_sub.clj | 9 +- .../src/cmr/message_queue/queue/aws_queue.clj | 17 +- .../src/cmr/message_queue/services/queue.clj | 3 +- .../src/cmr/message_queue/topic/aws_topic.clj | 91 ++- .../cmr/message_queue/topic/local_topic.clj | 80 ++- .../message_queue/topic/topic_protocol.clj | 6 +- .../cmr/message_queue/test/pub_sub_test.clj | 14 +- .../int_test/concepts/concept_delete_spec.clj | 9 +- metadata-db-app/project.clj | 1 + .../src/cmr/metadata_db/api/subscriptions.clj | 13 +- .../cmr/metadata_db/data/ingest_events.clj | 5 +- .../data/oracle/sub_notifications.clj | 34 +- ...091_update_cmr_sub_notifications_table.clj | 18 + .../metadata_db/services/concept_service.clj | 51 +- .../services/sub_notifications.clj | 30 +- .../services/subscription_cache.clj | 7 +- .../metadata_db/services/subscriptions.clj | 197 +++++- .../src/cmr/metadata_db/system.clj | 9 +- .../test/services/subscriptions_test.clj | 609 +++++++++++++----- search-app/project.clj | 1 + .../subscription_processing_test.clj | 28 +- transmit-lib/src/cmr/transmit/search.clj | 9 +- .../test/cmr/transmit/test/search.clj | 3 +- 25 files changed, 965 insertions(+), 303 deletions(-) create mode 100644 metadata-db-app/src/cmr/metadata_db/migrations/091_update_cmr_sub_notifications_table.clj diff --git a/common-app-lib/src/cmr/common_app/services/cache_info.clj b/common-app-lib/src/cmr/common_app/services/cache_info.clj index 4bbce1e7fe..03f73954ce 100644 --- a/common-app-lib/src/cmr/common_app/services/cache_info.clj +++ b/common-app-lib/src/cmr/common_app/services/cache_info.clj @@ -9,10 +9,14 @@ [cmr.common.jobs :refer [defjob]] [cmr.common.log :refer [debug error]])) +;; This is the cache size map validations. Cache keys can either be keywords +;; or strings. (s/def ::cache-size-map (s/and map? - #(every? keyword? (keys %)) - #(every? number? (vals %)))) + (fn [m] + (every? #(or keyword? % + string? %) (keys m))) + #(every? number? (vals %)))) (defn human-readable-bytes [size] diff --git a/ingest-app/src/cmr/ingest/services/subscriptions_helper.clj b/ingest-app/src/cmr/ingest/services/subscriptions_helper.clj index bb6201dadf..3a788c75f6 100644 --- a/ingest-app/src/cmr/ingest/services/subscriptions_helper.clj +++ b/ingest-app/src/cmr/ingest/services/subscriptions_helper.clj @@ -110,10 +110,11 @@ #_{:clj-kondo/ignore [:unresolved-var]} (defn- send-update-subscription-notification-time! "Fires off an http call to update the time which the subscription last was processed" - [context sub-id] - (debug "send-update-subscription-notification-time with" sub-id) - (search/save-subscription-notification-time context sub-id)) + [context sub-id last-notified-time] + (debug "send-update-subscription-notification-time with" sub-id ) + (search/save-subscription-notification-time context sub-id last-notified-time)) +#_{:clj-kondo/ignore [:unresolved-var]} (defn- filter-concept-refs-by-subscriber-id "Takes a list of concept references and a subscriber id and removes any concept that the user does not have read access to." @@ -266,12 +267,17 @@ Sent subscription email to [" email-address "]. \nSubscription email contents: [" email-content "].")) (when update-notification-time? - (send-update-subscription-notification-time! context sub-id)) + (send-update-subscription-notification-time! context sub-id (:end-time subscription))) (catch Exception e (error "Exception caught in email subscription: " sub-id "\n\n" (.getMessage e) "\n\n" e)))))) subscriber-filtered-concept-refs-list)) +(defn remove-ingest-subscriptions + "Remove ingest subscriptions since emails are not sent out for those." + [concept] + (:EndPoint (json/decode (:metadata concept) true))) + (defn email-subscription-processing "Process email subscriptions and send email when found granules matching the collection and queries in the subscription and were created/updated during the last processing interval." @@ -280,7 +286,7 @@ ([context revision-date-range] (let [subscriptions (->> (mdb/find-concepts context {:latest true} :subscription) (remove :deleted) - (remove #(:endpoint (:extra-fields %))) + (remove remove-ingest-subscriptions) (map #(select-keys % [:concept-id :extra-fields :metadata])))] (send-subscription-emails context (process-subscriptions context subscriptions revision-date-range) (nil? revision-date-range))))) diff --git a/message-queue-lib/src/cmr/message_queue/pub_sub.clj b/message-queue-lib/src/cmr/message_queue/pub_sub.clj index bcdb427033..5e0fb08462 100644 --- a/message-queue-lib/src/cmr/message_queue/pub_sub.clj +++ b/message-queue-lib/src/cmr/message_queue/pub_sub.clj @@ -8,8 +8,7 @@ (defn create-topic "Create a topic using the given topic configuration. The type is determined by the environment variable CMR_QUEUE_TYPE." - [] - (let [create-fn (case (config/queue-type) - "memory" local-topic/setup-topic nil - "aws" aws-topic/setup-topic (config/cmr-internal-subscriptions-topic-name))] - (create-fn))) + [sns-name] + (case (config/queue-type) + "memory" (local-topic/setup-topic) + "aws" (aws-topic/setup-topic sns-name))) diff --git a/message-queue-lib/src/cmr/message_queue/queue/aws_queue.clj b/message-queue-lib/src/cmr/message_queue/queue/aws_queue.clj index b4badee9bb..a23abf9979 100644 --- a/message-queue-lib/src/cmr/message_queue/queue/aws_queue.clj +++ b/message-queue-lib/src/cmr/message_queue/queue/aws_queue.clj @@ -11,7 +11,9 @@ (software.amazon.awssdk.services.sqs.model CreateQueueResponse) (software.amazon.awssdk.services.sqs.model DeleteMessageRequest) (software.amazon.awssdk.services.sqs.model DeleteQueueRequest) + (software.amazon.awssdk.services.sqs.model GetQueueAttributesRequest) (software.amazon.awssdk.services.sqs.model MessageAttributeValue) + (software.amazon.awssdk.services.sqs.model QueueAttributeName) (software.amazon.awssdk.services.sqs.model ReceiveMessageRequest) (software.amazon.awssdk.services.sqs.model ReceiveMessageResponse) (software.amazon.awssdk.services.sqs.model SendMessageRequest) @@ -145,11 +147,21 @@ queue-url (.getMessage e)))))) +(defn get-queue-arn + "Gets the SQS ARN value from the queue. We need this value to subscribe the queue to an SNS topic." + [sqs-client queue-url] + (let [sqs-request (-> (GetQueueAttributesRequest/builder) + (.queueUrl queue-url) + (.attributeNames [QueueAttributeName/QUEUE_ARN]) + (.build)) + response (.getQueueAttributes sqs-client sqs-request)] + (get (.attributesAsStrings response) "QueueArn"))) + (comment (let [sqs-client (create-sqs-client (cmr.message-queue.config/sqs-server-url)) queue-url (create-queue sqs-client (cmr.message-queue.config/cmr-internal-subscriptions-queue-name)) - message-attributes (attributes-builder {"collection-concept-id" "C12345-PROV1"}) + message-attributes (attributes-builder {"collection-concept-id" "C1200000065-PROV1"}) message "A test message" _ (publish sqs-client queue-url message message-attributes) messages (receive-messages sqs-client queue-url)] @@ -158,5 +170,4 @@ (println (.receiptHandle %)) (println (.messageAttributes %))) messages) - (delete-messages sqs-client queue-url messages)) - ) \ No newline at end of file + (delete-messages sqs-client queue-url messages))) diff --git a/message-queue-lib/src/cmr/message_queue/services/queue.clj b/message-queue-lib/src/cmr/message_queue/services/queue.clj index 8bcab36979..e4d0720c99 100644 --- a/message-queue-lib/src/cmr/message_queue/services/queue.clj +++ b/message-queue-lib/src/cmr/message_queue/services/queue.clj @@ -38,8 +38,7 @@ (Thread/sleep (config/messaging-retry-delay)) (recur queue-broker exchange-name msg))) -(declare publish-message) -(declare queue-broker exchange-name msg) +(declare publish-message queue-broker exchange-name msg) (defn-timed publish-message "Publishes a message to an exchange Throws a service unavailable error if the message fails to be put on the queue. diff --git a/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj b/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj index 5e66f0df30..3a7d13d847 100644 --- a/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj +++ b/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj @@ -1,8 +1,12 @@ (ns cmr.message-queue.topic.aws-topic "Defines an AWS implementation of the topic protocol." (:require + [cheshire.core :as json] [cmr.common.dev.record-pretty-printer :as record-pretty-printer] [cmr.common.log :refer [error]] + [cmr.common.util :as util] + [cmr.message-queue.config :as config] + [cmr.message-queue.queue.aws-queue :as aws-queue] [cmr.message-queue.topic.topic-protocol :as topic-protocol]) (:import (software.amazon.awssdk.regions Region) @@ -10,7 +14,10 @@ (software.amazon.awssdk.services.sns.model CreateTopicRequest) (software.amazon.awssdk.services.sns.model CreateTopicResponse) (software.amazon.awssdk.services.sns.model MessageAttributeValue) - (software.amazon.awssdk.services.sns.model PublishRequest))) + (software.amazon.awssdk.services.sns.model PublishRequest) + (software.amazon.awssdk.services.sns.model SetSubscriptionAttributesRequest) + (software.amazon.awssdk.services.sns.model SubscribeRequest) + (software.amazon.awssdk.services.sns.model UnsubscribeRequest))) (defn attribute-builder "Create an AWS attribute based on the passed in value to use when @@ -47,6 +54,52 @@ (recur (rest new-keys) (conj result {attr-key attr-value}))) result))))) +(defn subscribe-sqs-to-sns + "Subscribes an AWS SQS to an AWS Topic." + [sns-client topic-arn sqs-arn] + (let [sub-request (-> (SubscribeRequest/builder) + (.protocol "sqs") + (.endpoint sqs-arn) + (.returnSubscriptionArn true) + (.topicArn topic-arn) + (.build)) + response (.subscribe sns-client sub-request)] + (.subscriptionArn response))) + +(defn set-filter-policy + "For a given subscription set the filter policy so that the queue + only gets the notificiation messages that it wants. The passed in + filter policy is a hash map - for example: + {\"collection-concept-id\": \"C12345-PROV1\" + \"mode\": [\"New\", \"Update\"]}" + [sns-client subscription-arn subscription] + ;; Turn the clojure filter policy to json + (when (or (:CollectionConceptId subscription) + (:Mode subscription)) + (let [filters (util/remove-nil-keys + {:collection-concept-id (:CollectionConceptId subscription) + :mode (:Mode subscription)}) + filter-json (json/generate-string filters) + sub-filter-request (-> (SetSubscriptionAttributesRequest/builder) + (.subscriptionArn subscription-arn) + (.attributeName "FilterPolicy") + (.attributeValue filter-json) + (.build))] + (.setSubscriptionAttributes sns-client sub-filter-request)))) + +(defn set-redrive-policy + "For a given subscription set the redrive-policy - which is a dead letter queue if the + message cannot be sent from the SNS to the subscribed endpoint." + [sns-client subscription-arn dead-letter-queue-arn] + (let [redrive-policy (str "{\"deadLetterTargetArn\": \"" dead-letter-queue-arn "\"}") + _ (println "redrive-policy:" redrive-policy) + sqs-request (-> (SetSubscriptionAttributesRequest/builder) + (.subscriptionArn subscription-arn) + (.attributeName "RedrivePolicy") + (.attributeValue redrive-policy) + (.build))] + (.setSubscriptionAttributes sns-client sqs-request))) + (defrecord AWSTopic [;; A record containing fields related to accessing SNS topics. @@ -54,19 +107,40 @@ ^SnsClient sns-client ;; The endpoint of the topic to send messages to. For AWS it is the topic ARN, ;; for the in memory implementation it is nil. - topic-arn] + topic-arn + + subscription-dead-letter-queue-arn] ;; This will be filled in next sprint. CMR-10141 topic-protocol/Topic (subscribe - [_this _subscription]) + [_this subscription] + (try + (let [subscription-arn (subscribe-sqs-to-sns sns-client topic-arn (get-in subscription [:metadata :EndPoint]))] + (when subscription-arn + (set-filter-policy sns-client subscription-arn subscription) + (set-redrive-policy sns-client subscription-arn subscription-dead-letter-queue-arn)) + subscription-arn) + (catch Exception e + (error (format "Exception caught trying to subscribe the queue %s to the %s SNS Topic. Exception: %s" + (:EndPoint subscription) + topic-arn + (.getMessage e)))))) + + (unsubscribe + [_this subscription-id] + (let [sub-request (-> (UnsubscribeRequest/builder) + (.subscriptionArn (:subscription-arn subscription-id)) + (.build))] + (.unsubscribe sns-client sub-request)) + (:subscription-arn subscription-id)) (publish - [_this message message-attributes] + [_this message message-attributes subject] (let [msg-atts (attributes-builder message-attributes) pub-request (-> (PublishRequest/builder) (.message message) - (.subject (:subject message-attributes)) + (.subject subject) (.topicArn topic-arn) (.messageAttributes msg-atts) (.build))] @@ -103,8 +177,11 @@ [sns-name] (println "Setting up AWS-topic") (let [sns-client (create-sns-client) - topic-arn (create-sns-topic sns-client sns-name)] - (->AWSTopic sns-client topic-arn))) + topic-arn (create-sns-topic sns-client sns-name) + sqs-client (aws-queue/create-sqs-client) + sub-dl-queue-url (aws-queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name)) + sub-dl-queue-arn (aws-queue/get-queue-arn sqs-client sub-dl-queue-url)] + (->AWSTopic sns-client topic-arn sub-dl-queue-arn))) (comment (def topic (setup-topic "cmr-internal-subscriptions-sit")) diff --git a/message-queue-lib/src/cmr/message_queue/topic/local_topic.clj b/message-queue-lib/src/cmr/message_queue/topic/local_topic.clj index 8803e5b056..d92a0d714f 100644 --- a/message-queue-lib/src/cmr/message_queue/topic/local_topic.clj +++ b/message-queue-lib/src/cmr/message_queue/topic/local_topic.clj @@ -19,11 +19,11 @@ msg-atts (queue/attributes-builder message-attributes)] (try (if filter - (when (and (= (:collection-concept-id message-attributes) - (:collection-concept-id filter)) + (when (and (= (message-attributes "collection-concept-id") + (filter :collection-concept-id)) (or (nil? (:mode filter)) - (some #(= (:mode message-attributes) %) (:mode filter)))) - (queue/publish sqs-client queue-url message msg-atts)) + (some #(= (message-attributes "mode") %) (:mode filter)))) + (queue/publish sqs-client queue-url message msg-atts)) (queue/publish sqs-client queue-url message msg-atts)) (catch SqsException e (info (format "Exception caught publishing message to %s. Exception: %s. Please check if queue exists. Send message to %s." @@ -37,6 +37,24 @@ (.getMessage e) dead-letter-queue-url)))))))) +(defn infrastructure_setup? + "Check to see if the infrastructure has been setup" + [topic] + (seq @(:subscription-atom topic))) + +(defn setup-infrastructure + "Set up the local CMR internal subscription queue and dead letter queue and + subscribe then to the passed in topic. This function assumes that elasticmq + is up and running, or that the tests will start one." + [topic] + (when-not (infrastructure_setup? topic) + (let [sqs-client (queue/create-sqs-client (config/sqs-server-url)) + subscription {:sqs-client sqs-client + :queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-queue-name)) + :dead-letter-queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-dead-letter-queue-name))}] + (queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name)) + (swap! (:subscription-atom topic) conj subscription)))) + (defrecord LocalTopic [;; An atom containing a list of subscriptions. A subscription is a map that @@ -45,11 +63,44 @@ topic-protocol/Topic (subscribe - [_this subscription] - (swap! subscription-atom conj subscription)) + [this subscription] + ;; to speed up development startup, the setup call is here and setup checks first to see if it is already setup. + ;; Otherwise on startup the system would have to wait for the elasticmq to start before it could continue with setting + ;; up the database slowing down all the tests. + (setup-infrastructure this) + (let [metadata (:metadata subscription) + sqs-client (queue/create-sqs-client (config/sqs-server-url)) + sub {:sqs-client sqs-client + :filter (when (or (:CollectionConceptId metadata) + (:Mode metadata)) + {:collection-concept-id (:CollectionConceptId metadata) + :mode (:Mode metadata)}) + :queue-url (:EndPoint metadata) + :dead-letter-queue-url (queue/create-queue sqs-client (config/cmr-subscriptions-dead-letter-queue-name)) + :concept-id (:concept-id subscription)}] + (if-not (seq (filter #(= (:concept-id %) (:concept-id subscription)) + @subscription-atom)) + (swap! subscription-atom conj sub) + (let [new-subs (filter #(not= (:concept-id %) (:concept-id subscription)) @subscription-atom)] + (reset! subscription-atom (conj new-subs sub)))) + ;; instead of the full subscription list, pass back the subscription concept id. + (:concept-id subscription))) + + (unsubscribe + [_this subscription-id] + ;; remove the subscription from the atom and send back the subscription id, not the atom contents. + (swap! subscription-atom (fn [subs] + (doall + (filter #(not= (:concept-id %) (:concept-id subscription-id)) + subs)))) + (:concept-id subscription-id)) (publish - [_this message message-attributes] + [this message message-attributes _subject] + ;; to speed up development startup, the setup call is here and setup checks first to see if it is already setup. + ;; Otherwise on startup the system would have to wait for the elasticmq to start before it could continue with setting + ;; up the database slowing down all the tests. + (setup-infrastructure this) (doall (map #(publish-message % message message-attributes) @subscription-atom))) (health @@ -63,21 +114,8 @@ [] (->LocalTopic (atom '()))) -(defn setup-infrastructure - "Set up the local CMR internal subscription queue and dead letter queue and - subscribe then to the passed in topic. This function assumes that elasticmq - is up and running, or that the tests will start one." - [topic] - (let [sqs-client (queue/create-sqs-client (config/sqs-server-url)) - queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-queue-name)) - dl-queue-url (queue/create-queue sqs-client (config/cmr-internal-subscriptions-dead-letter-queue-name))] - (topic-protocol/subscribe topic {:sqs-client sqs-client - :filter nil - :queue-url queue-url - :dead-letter-queue-url dl-queue-url}))) - (comment (def topic (setup-topic)) (def subscription (setup-infrastructure topic)) - (topic-protocol/publish topic "test" {"test" "test"}) + (topic-protocol/publish topic "test" {"test" "test"} "test") (:subscription-atom topic)) diff --git a/message-queue-lib/src/cmr/message_queue/topic/topic_protocol.clj b/message-queue-lib/src/cmr/message_queue/topic/topic_protocol.clj index 305db175ce..6e568e7955 100644 --- a/message-queue-lib/src/cmr/message_queue/topic/topic_protocol.clj +++ b/message-queue-lib/src/cmr/message_queue/topic/topic_protocol.clj @@ -8,8 +8,12 @@ [this subscription] "Subscribes to the given topic.") + (unsubscribe + [this subscription-id] + "Unsubscribes to the given topic.") + (publish - [this message message-attributes] + [this message message-attributes subject] "Publishes a message on the topic. Returns true if the message was successful. Otherwise returns false.") diff --git a/message-queue-lib/test/cmr/message_queue/test/pub_sub_test.clj b/message-queue-lib/test/cmr/message_queue/test/pub_sub_test.clj index 1a4a3a7a76..f44d332c35 100644 --- a/message-queue-lib/test/cmr/message_queue/test/pub_sub_test.clj +++ b/message-queue-lib/test/cmr/message_queue/test/pub_sub_test.clj @@ -91,17 +91,19 @@ (deftest subscribe-queue-to-topic (when (= "memory" (config/queue-type)) - (let [topic (pub-sub/create-topic) - subscription (first (local-topic/setup-infrastructure topic)) + (let [topic (pub-sub/create-topic "sns-name") + _ (local-topic/setup-infrastructure topic) + subscription (first @(:subscription-atom topic)) message "test" message-attributes {"collection-concept-id" "C12345-PROV1" - "mode" "New"} + "mode" "New"} + subject "A new granule" sqs-client (:sqs-client subscription) queue-url (:queue-url subscription) dead-letter-queue-url (:dead-letter-queue-url subscription)] - (is (= (keys subscription) '(:sqs-client :filter :queue-url :dead-letter-queue-url))) - (is (some? (topic-protocol/publish topic message message-attributes))) + (is (= (keys subscription) '(:sqs-client :queue-url :dead-letter-queue-url))) + (is (some? (topic-protocol/publish topic message message-attributes subject))) (when-let [messages (seq (queue/receive-messages sqs-client queue-url))] (is (= message (.body (first messages)))) @@ -109,7 +111,7 @@ ;; Delete the queue to test the publish sending to dead letter queue. (queue/delete-queue sqs-client queue-url) - (is (some? (topic-protocol/publish topic message message-attributes))) + (is (some? (topic-protocol/publish topic message message-attributes subject))) (when-let [messages (seq (queue/receive-messages sqs-client dead-letter-queue-url))] (is (= message (.body (first messages)))) (is (some? (queue/delete-messages sqs-client dead-letter-queue-url messages))))))) diff --git a/metadata-db-app/int-test/cmr/metadata_db/int_test/concepts/concept_delete_spec.clj b/metadata-db-app/int-test/cmr/metadata_db/int_test/concepts/concept_delete_spec.clj index ac3613eb39..191054195c 100644 --- a/metadata-db-app/int-test/cmr/metadata_db/int_test/concepts/concept_delete_spec.clj +++ b/metadata-db-app/int-test/cmr/metadata_db/int_test/concepts/concept_delete_spec.clj @@ -19,14 +19,17 @@ {concept-id2 :concept-id revision-id2 :revision-id} (util/save-concept concept2) {:keys [status revision-id] :as tombstone} (util/delete-concept concept-id) deleted-concept1 (:concept (util/get-concept-by-id-and-revision concept-id revision-id)) - saved-concept1 (:concept (util/get-concept-by-id-and-revision concept-id (dec revision-id)))] + saved-concept1 (:concept (util/get-concept-by-id-and-revision concept-id (dec revision-id))) + metadata (if (= :subscription concept-type) + (:metadata saved-concept1) + "")] (is (= {:status 201 :revision-id 4} {:status status :revision-id revision-id})) (is (= (dissoc (assoc saved-concept1 :deleted true - :metadata "" :revision-id revision-id + :metadata metadata :user-id nil) :revision-date :user-id :transaction-id) (dissoc deleted-concept1 :revision-date :user-id :transaction-id))) @@ -88,7 +91,7 @@ (is (= {:status 201 :revision-id 4 :deleted true - :metadata ""} + :metadata (:metadata stored-concept1)} {:status status :revision-id revision-id :deleted (:deleted stored-concept1) diff --git a/metadata-db-app/project.clj b/metadata-db-app/project.clj index df2b0e76cd..51d11b8150 100644 --- a/metadata-db-app/project.clj +++ b/metadata-db-app/project.clj @@ -37,6 +37,7 @@ [org.clojars.gjahad/debug-repl "0.3.3"] [org.clojure/tools.namespace "0.2.11"] [nasa-cmr/cmr-common-lib "0.1.1-SNAPSHOT"] + [nasa-cmr/cmr-message-queue-lib "0.1.0-SNAPSHOT"] [pjstadig/humane-test-output "0.9.0"] [proto-repl "0.3.1"]] :jvm-opts ^:replace ["-server"] diff --git a/metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj b/metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj index 9bc68650ba..5f0b11a3e8 100644 --- a/metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj +++ b/metadata-db-app/src/cmr/metadata_db/api/subscriptions.clj @@ -1,15 +1,21 @@ (ns cmr.metadata-db.api.subscriptions "Defines the HTTP URL routes for the application as related to subscriptions." (:require + [cheshire.core :as json] + [clojure.string :as string] [cmr.metadata-db.services.sub-notifications :as sub-note] [cmr.metadata-db.services.subscriptions :as subscriptions] [compojure.core :refer [PUT POST context]])) (defn- update-subscription-notification-time "Update a subscription notification time" - [context params] + [context params body] (let [sub-id (:subscription-concept-id params) - _ (sub-note/update-subscription-notification context sub-id)] + last-notified-time (-> (slurp body) + (string/trim) + (json/decode true) + (get :last-notified-time))] + (sub-note/update-subscription-notification context sub-id last-notified-time) {:status 204})) (def subscription-api-routes @@ -17,8 +23,9 @@ ;; receive notification to update subscription time (PUT "/:subscription-concept-id/notification-time" {params :params + body :body request-context :request-context} - (update-subscription-notification-time request-context params)) + (update-subscription-notification-time request-context params body)) (POST "/refresh-subscription-cache" {request-context :request-context} (subscriptions/refresh-subscription-cache request-context)))) diff --git a/metadata-db-app/src/cmr/metadata_db/data/ingest_events.clj b/metadata-db-app/src/cmr/metadata_db/data/ingest_events.clj index 45c0bb46a0..2c5934b51a 100644 --- a/metadata-db-app/src/cmr/metadata_db/data/ingest_events.clj +++ b/metadata-db-app/src/cmr/metadata_db/data/ingest_events.clj @@ -2,7 +2,6 @@ "Allows broadcast of ingest events via the message queue" (:require [cmr.common.concepts :as cc] - [cmr.common.log :as log :refer (debug info warn error)] [cmr.common.services.errors :as errors] [cmr.message-queue.services.queue :as queue] [cmr.metadata-db.config :as config])) @@ -56,7 +55,7 @@ (defn concept-delete-event "Creates an event representing a concept being deleted." - [{:keys [concept-id revision-id] :as concept}] + [{:keys [concept-id revision-id]}] {:action :concept-delete :concept-id concept-id :revision-id revision-id}) @@ -82,7 +81,7 @@ (defn publish-tombstone-delete-msg "Publishes a message indicating a tombstone was removed/overwritten with updated concept" - [context concept-type concept-id revision-id] + [context concept-id revision-id] (when (config/publish-messages) (let [queue-broker (get-in context [:system :queue-broker]) exchange-name (config/deleted-granule-exchange-name) diff --git a/metadata-db-app/src/cmr/metadata_db/data/oracle/sub_notifications.clj b/metadata-db-app/src/cmr/metadata_db/data/oracle/sub_notifications.clj index ac6f90f9a0..e14df1d118 100644 --- a/metadata-db-app/src/cmr/metadata_db/data/oracle/sub_notifications.clj +++ b/metadata-db-app/src/cmr/metadata_db/data/oracle/sub_notifications.clj @@ -3,7 +3,6 @@ (:require [clj-time.coerce :as cr] [clojure.java.jdbc :as j] - [cmr.common.time-keeper :as t] ;; don't use clj-time [cmr.oracle.connection :as oracle])) ; A note about prepared statments, with j/query using ? and [] is the same as @@ -13,10 +12,12 @@ (defn dbresult->sub-notification "Converts a map result from the database to a provider map" [db data] - (let [{:keys [subscription_concept_id last_notified_at]} data] + (let [{:keys [subscription_concept_id last_notified_at aws_arn]} data] (j/with-db-transaction [conn db] {:subscription-concept-id subscription_concept_id - :last-notified-at (oracle/oracle-timestamp->str-time conn last_notified_at)}))) + :last-notified-at (when last_notified_at + (oracle/oracle-timestamp->str-time conn last_notified_at)) + :aws-arn aws_arn}))) (defn subscription-exists? "Check to see if the subscription exists" @@ -38,7 +39,7 @@ (defn get-sub-notification "Get subscription notification from Oracle." [db subscription-id] - (let [sql (str "SELECT id, subscription_concept_id, last_notified_at " + (let [sql (str "SELECT id, subscription_concept_id, last_notified_at, aws_arn " "FROM cmr_sub_notifications " "WHERE subscription_concept_id = ?") results (first (j/query db [sql subscription-id]))] @@ -48,18 +49,28 @@ "Create subscription notification record in Oracle." [db subscription-id] (let [sql (str "INSERT INTO cmr_sub_notifications" - "(id, subscription_concept_id)" - "VALUES (cmr_sub_notifications_seq.nextval, ?)")] + "(id, subscription_concept_id, last_notified_at)" + "VALUES (cmr_sub_notifications_seq.nextval, ?, NULL)")] (j/db-do-prepared db sql [subscription-id]))) (defn update-sub-notification "Update a subscription notification in Oracle." - [db subscription-id] + [db subscription-id last-notified-time] (let [sql (str "UPDATE cmr_sub_notifications " "SET last_notified_at = ? " - "WHERE subscription_concept_id = ?") - now (t/now)] - (j/db-do-prepared db sql [(cr/to-sql-time now) subscription-id]))) + "WHERE subscription_concept_id = ?")] + (j/db-do-prepared db sql [(cr/to-sql-time last-notified-time) subscription-id]))) + +(defn update-sub-not-with-aws-arn + "Updates the subscription notification with the subscription arn. + If the subscription doesn't exist then create it as well." + [db subscription-id aws-arn] + (when-not (sub-notification-exists? db subscription-id) + (save-sub-notification db subscription-id)) + (let [sql (str "UPDATE cmr_sub_notifications " + "SET aws_arn = ? " + "WHERE subscription_concept_id = ?")] + (j/db-do-prepared db sql [aws-arn subscription-id]))) (defn delete-sub-notification "Delete a subscription notification record by id" @@ -78,5 +89,6 @@ (println (save-sub-notification db "SUB1234-test")) (println (sub-notification-exists? db "SUB1234-test")) (println (get-sub-notification db "SUB1234-test")) - (println (update-sub-notification db "SUB1234-test")) + (println (update-sub-notification db "SUB1234-test" "2024-11-01T02:17:09.749Z")) + (println (update-sub-not-with-aws-arn db "SUB1234-test" "arn:aws:sns:us-east-1:1234455667:SometestSubscription")) (println (delete-sub-notification db "SUB1234-test")) ) diff --git a/metadata-db-app/src/cmr/metadata_db/migrations/091_update_cmr_sub_notifications_table.clj b/metadata-db-app/src/cmr/metadata_db/migrations/091_update_cmr_sub_notifications_table.clj new file mode 100644 index 0000000000..2cf4e110f2 --- /dev/null +++ b/metadata-db-app/src/cmr/metadata_db/migrations/091_update_cmr_sub_notifications_table.clj @@ -0,0 +1,18 @@ +(ns cmr.metadata-db.migrations.091-update-cmr-sub-notifications-table + "Add a column to the table to store AWS subscription arns." + (:require + [config.mdb-migrate-helper :as h])) + +(defn up + "Migrates the database up to version 91." + [] + (println "cmr.metadata-db.migrations.091-update-cmr-sub-notifications-table up...") + (h/sql "ALTER TABLE METADATA_DB.CMR_SUB_NOTIFICATIONS ADD AWS_ARN VARCHAR(2048) NULL") + (h/sql "ALTER TABLE METADATA_DB.CMR_SUB_NOTIFICATIONS MODIFY LAST_NOTIFIED_AT TIMESTAMP WITH TIME ZONE NULL")) + +(defn down + "Migrates the database down from version 91." + [] + (println "cmr.metadata-db.migrations.091-update-cmr-sub-notifications-table down.") + (h/sql "ALTER TABLE METADATA_DB.CMR_SUB_NOTIFICATIONS DROP COLUMN AWS_ARN")) + ;; When the database has null values, we cannot make them null so the column will not change back. diff --git a/metadata-db-app/src/cmr/metadata_db/services/concept_service.clj b/metadata-db-app/src/cmr/metadata_db/services/concept_service.clj index 3303da28a0..715e2d66d7 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/concept_service.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/concept_service.clj @@ -1,7 +1,6 @@ (ns cmr.metadata-db.services.concept-service "Services to support the business logic of the metadata db." (:require - [cheshire.core :as json] [clj-time.core :as t] [clojure.set :as set] [cmr.common.api.context :as cmn-context] @@ -23,6 +22,7 @@ [cmr.metadata-db.services.provider-service :as provider-service] [cmr.metadata-db.services.provider-validation :as pv] [cmr.metadata-db.services.search-service :as search] + [cmr.metadata-db.services.sub-notifications :as sub-not] [cmr.metadata-db.services.subscriptions :as subscriptions] [cmr.metadata-db.services.util :as util]) ;; Required to get code loaded @@ -762,33 +762,19 @@ "Helper function to create a tombstone concept by merging in parts of the previous concept to the tombstone." [metadata concept previous-revision] - (let [{:keys [concept-id revision-id revision-date user-id]} concept - {:keys [concept-type _]} (cu/parse-concept-id concept-id)] - (if (= :subscription concept-type) - (let [metadata-edn (json/decode metadata true) - extra-fields (:extra-fields previous-revision)] - (merge previous-revision {:concept-id concept-id - :revision-id revision-id - :revision-date revision-date - :user-id user-id - :metadata "" - :deleted true - :extra-fields (merge extra-fields - {:endpoint (:EndPoint metadata-edn) - :mode (:Mode metadata-edn) - :method (:Method metadata-edn)})})) - (merge previous-revision {:concept-id concept-id - :revision-id revision-id - :revision-date revision-date - :user-id user-id - :metadata metadata - :deleted true})))) + (let [{:keys [concept-id revision-id revision-date user-id]} concept] + (merge previous-revision {:concept-id concept-id + :revision-id revision-id + :revision-date revision-date + :user-id user-id + :metadata metadata + :deleted true}))) ;; true implies creation of tombstone for the revision (defmethod save-concept-revision true [context concept] (cv/validate-tombstone-request concept) - (let [{:keys [concept-id revision-id revision-date user-id skip-publication]} concept + (let [{:keys [concept-id revision-id skip-publication]} concept {:keys [concept-type provider-id]} (cu/parse-concept-id concept-id) db (util/context->db context) provider (provider-service/get-provider-by-id context provider-id true) @@ -807,6 +793,9 @@ (= :subscription concept-type)) (:metadata previous-revision) "") + subscription-arn (when (and subscriptions/subscriptions-enabled? + (= :subscription concept-type)) + (sub-not/get-subscription-aws-arn context concept-id)) tombstone (create-tombstone-concept metadata concept previous-revision)] (cv/validate-concept tombstone) (validate-concept-revision-id db provider tombstone previous-revision) @@ -852,7 +841,10 @@ (= concept-type :tool-association)) (ingest-events/publish-event context (ingest-events/concept-delete-event revisioned-tombstone))) - (subscriptions/change-subscription context concept-type revisioned-tombstone) + (when (and subscriptions/subscriptions-enabled? + (= :subscription concept-type)) + (subscriptions/delete-subscription context revisioned-tombstone subscription-arn)) + (subscriptions/work-potential-notification context revisioned-tombstone) revisioned-tombstone))) (if revision-id (cmsg/data-error :not-found @@ -930,8 +922,7 @@ (> revision-id 1)) (let [previous-concept (c/get-concept db concept-type provider concept-id (- revision-id 1))] (when (util/is-tombstone? previous-concept) - (ingest-events/publish-tombstone-delete-msg - context concept-type concept-id revision-id)))) + (ingest-events/publish-tombstone-delete-msg context concept-id revision-id)))) ;; publish service/tool associations update event if applicable, i.e. when the concept is a service/tool, ;; so that the collections can be updated in elasticsearch with the updated service/tool info @@ -940,7 +931,11 @@ (ingest-events/publish-event context (ingest-events/concept-update-event concept)) - (subscriptions/change-subscription context concept-type concept) + (when (and subscriptions/subscriptions-enabled? + (= :subscription concept-type)) + (when-let [subscription-arn (subscriptions/add-subscription context concept)] + (sub-not/update-subscription-with-aws-arn context concept-id subscription-arn))) + (subscriptions/work-potential-notification context concept) concept))) (defn- delete-associated-tag-associations @@ -1114,7 +1109,7 @@ (= :granule concept-type)) ;; Remove any reference to granule from deleted-granule index (doseq [[concept-id revision-id] concept-id-revision-id-tuples] - (ingest-events/publish-tombstone-delete-msg context concept-type concept-id revision-id))) + (ingest-events/publish-tombstone-delete-msg context concept-id revision-id))) (doseq [[concept-id revision-id] concept-id-revision-id-tuples] ;; performs the cascading delete actions first (force-delete-cascading-events context concept-type concept-id (long revision-id))) diff --git a/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj b/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj index 583925a062..b66d0d4f70 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj @@ -11,21 +11,43 @@ "Do the work for updating the subscription notificitation time in the database. The record is lazly created, if a subscription exists, but not notification record then create the notification, otherwise update the existing one." - [db subscription-id] + [db subscription-id last-notified-time] (if (sub-note/subscription-exists? db subscription-id) (if (sub-note/sub-notification-exists? db subscription-id) - (sub-note/update-sub-notification db subscription-id) + (sub-note/update-sub-notification db subscription-id last-notified-time) (sub-note/save-sub-notification db subscription-id)) (errors/throw-service-error :not-found (msg/subscription-not-found subscription-id)))) (defn update-subscription-notification "update a subscription notification record, creating one if needed, complain if subscription id is not valid or not found" - [context subscription-id] + [context subscription-id last-notified-time] (let [errors (common-concepts/concept-id-validation subscription-id) db (mdb-util/context->db context)] (if (nil? errors) - (update-subscription-notification-time-in-database db subscription-id) + (update-subscription-notification-time-in-database db subscription-id last-notified-time) (errors/throw-service-error :not-found (msg/subscription-not-found subscription-id))))) + +(defn update-subscription-with-aws-arn + "Update the sub_notifications DB table with the subscription arn value." + [context subscription-id subscription-arn] + (let [errors (common-concepts/concept-id-validation subscription-id) + db (mdb-util/context->db context)] + (if (nil? errors) + (sub-note/update-sub-not-with-aws-arn db subscription-id subscription-arn) + (errors/throw-service-error + :not-found + (msg/subscription-not-found subscription-id))))) + +(defn get-subscription-aws-arn + "Get the subscription ARN value from the sub_notifications DB table." + [context subscription-id] + (let [errors (common-concepts/concept-id-validation subscription-id) + db (mdb-util/context->db context)] + (if (nil? errors) + (:aws-arn (sub-note/get-sub-notification db subscription-id)) + (errors/throw-service-error + :not-found + (msg/subscription-not-found subscription-id))))) diff --git a/metadata-db-app/src/cmr/metadata_db/services/subscription_cache.clj b/metadata-db-app/src/cmr/metadata_db/services/subscription_cache.clj index 8d2178c89c..785333b37c 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/subscription_cache.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/subscription_cache.clj @@ -27,8 +27,11 @@ (defn set-value "Set the collection concept id and its subscription map described at the top." [context field value] - (let [cache-client (hash-cache/context->cache context subscription-cache-key)] - (hash-cache/set-value cache-client subscription-cache-key field value))) + (let [cache-client (hash-cache/context->cache context subscription-cache-key) + [tm result] (util/time-execution + (hash-cache/set-value cache-client subscription-cache-key field value))] + (rl-util/log-redis-write-complete "ingest-subscription-cache set-value" subscription-cache-key tm) + result)) (defn get-value "Returns the collection-concept-id subscription map which is described at the top." diff --git a/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj b/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj index 76fb148cb0..7230c31f70 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj @@ -2,21 +2,23 @@ "Buisness logic for subscription processing." (:require [cheshire.core :as json] + [cmr.common.log :refer [debug info]] + [cmr.message-queue.topic.topic-protocol :as topic-protocol] [cmr.metadata-db.config :as mdb-config] [cmr.metadata-db.services.search-service :as mdb-search] - [cmr.metadata-db.services.subscription-cache :as subscription-cache])) + [cmr.metadata-db.services.subscription-cache :as subscription-cache] + [cmr.transmit.config :as t-config])) (def subscriptions-enabled? "Checks to see if ingest subscriptions are enabled." (mdb-config/ingest-subscription-enabled)) -(defn subscription-concept? - "Checks to see if the passed in concept-type and concept is a subscription concept." - [concept-type concept] - (and subscriptions-enabled? - (= :subscription concept-type) - (some? (:endpoint (:extra-fields concept))) - (= "ingest" (:method (:extra-fields concept))))) +(defn ingest-subscription-concept? + "Checks to see if the concept is a ingest subscription concept." + [concept-edn] + (let [metadata (:metadata concept-edn)] + (and (some? (:EndPoint metadata)) + (= "ingest" (:Method metadata))))) (defn granule-concept? "Checks to see if the passed in concept-type and concept is a granule concept." @@ -24,6 +26,10 @@ (and subscriptions-enabled? (= :granule concept-type))) +;; +;; The functions below are for adding and deleting subscriptions to the cache. +;; + (defn ^:dynamic get-subscriptions-from-db "Get the subscriptions from the database. This function primarily exists so that it can be stubbed out for unit tests." @@ -36,6 +42,20 @@ :concept-type :subscription :collection-concept-id coll-concept-id}))) +(defn convert-concept-to-edn + "Converts the passed in concept to edn" + [subscription] + (update subscription :metadata #(json/decode % true))) + +(defn convert-and-filter-subscriptions + "Convert the metadata of the subscriptions to edn and then filter out the non + ingest subscriptions." + [subscriptions] + (let [subs (map convert-concept-to-edn subscriptions)] + (filter #(and (ingest-subscription-concept? %) + (= false (:deleted %))) + subs))) + (defn add-to-existing-mode "Depending on the passed in new-mode [\"New\" \"Update\"] create a structure that merges the new mode to the existing mode. The result looks like [\"New\" \"Update\"]" @@ -61,20 +81,54 @@ (let [sub (first subs)] (if (nil? sub) result - (recur (rest subs) (add-to-existing-mode result (get-in sub [:extra-fields :mode]))))))) + (recur (rest subs) (add-to-existing-mode result (get-in sub [:metadata :Mode]))))))) (defn change-subscription "When a subscription is added or deleted, the collection-concept-id must be put into or deleted from the subscription cache. Get the subscriptions that match the collection-concept-id - from the database and rebuild the modes list." - [context concept-type concept] - (when (subscription-concept? concept-type concept) - (let [coll-concept-id (:collection-concept-id (:extra-fields concept)) - subs (filter #(subscription-concept? (get % :concept-type) %) - (get-subscriptions-from-db context coll-concept-id))] - (if (seq subs) - (subscription-cache/set-value context coll-concept-id (merge-modes subs)) - (subscription-cache/remove-value context coll-concept-id))))) + from the database and rebuild the modes list. Return 1 if successful 0 otherwise." + [context concept-edn] + (let [coll-concept-id (:CollectionConceptId (:metadata concept-edn)) + subs (convert-and-filter-subscriptions (get-subscriptions-from-db context coll-concept-id))] + (if (seq subs) + (subscription-cache/set-value context coll-concept-id (merge-modes subs)) + (subscription-cache/remove-value context coll-concept-id)))) + +;; +;; The functions below are for subscribing and unsubscribing and endpoint to the topic. +;; + +(defn add-delete-subscription + "Do the work to see if subscriptions are enabled and add/remove + subscription from the cache. Return nil if subscriptions are not + enabled or the concept converted to edn." + [context concept] + (when (and subscriptions-enabled? + (= :subscription (:concept-type concept))) + (let [concept-edn (convert-concept-to-edn concept)] + (change-subscription context concept-edn) + concept-edn))) + +(defn add-subscription + "Add the subscription to the cache and subscribe the subscription to + the topic." + [context concept] + (when-let [concept-edn (add-delete-subscription context concept)] + (let [topic (get-in context [:system :sns :external])] + (topic-protocol/subscribe topic concept-edn)))) + +(defn delete-subscription + "Remove the subscription from the cache and unsubscribe the subscription from + the topic." + [context concept subscription-arn] + (when-let [concept-edn (add-delete-subscription context concept)] + (let [topic (get-in context [:system :sns :external])] + (topic-protocol/unsubscribe topic {:concept-id (:concept-id concept-edn) + :subscription-arn subscription-arn})))) + +;; +;; The functions below are for refreshing the subscription cache if needed. +;; (defn create-subscription-cache-contents-for-refresh "Go through all of the subscriptions and find the ones that are @@ -84,11 +138,10 @@ Collection concept id 2: [\"New\" \"Update\" \"Delete\"] ...}" [result sub] - (let [metadata (:metadata sub) - metadata-edn (json/decode metadata true)] - (if (and (some? (:EndPoint metadata-edn)) - (= "ingest" (:Method metadata-edn))) - (let [coll-concept-id (:collection-concept-id (:extra-fields sub)) + (let [concept-edn (convert-concept-to-edn sub) + metadata-edn (:metadata concept-edn)] + (if (ingest-subscription-concept? concept-edn) + (let [coll-concept-id (:CollectionConceptId metadata-edn) concept-map (result coll-concept-id) mode (:Mode metadata-edn)] (if concept-map @@ -102,6 +155,7 @@ then update the cache with the new values. Otherwise delete the contents that no longer exists." [context] (when subscriptions-enabled? + (info "Starting refreshing the ingest subscription cache.") (let [subs (get-subscriptions-from-db context) new-contents (reduce create-subscription-cache-contents-for-refresh {} subs) cache-content-keys (subscription-cache/get-keys context)] @@ -110,7 +164,102 @@ ;; Go through and remove any cache items that are not in the new-contents map. (doall (map #(when-not (new-contents %) (subscription-cache/remove-value context %)) - cache-content-keys))))) + cache-content-keys)) + (info "Finished refreshing the ingest subscription cache.")))) + +;; +;; The functions below are for publishing messages to the topic. +;; + +(defn get-producer-granule-id-message-str + "Get the granule producer id from the metadata and create a string for the + subscription notification message." + [concept-edn] + (let [identifiers (get-in concept-edn [:metadata :DataGranule :Identifiers]) + pgi (when identifiers + (:Identifier (first + (filter #(= "ProducerGranuleId" (:IdentifierType %)) + identifiers))))] + (when pgi + (str "\"producer-granule-id\": \"" pgi "\"")))) + +(defn get-location-message-str + "Get the granule search location for the subscription notification message." + [concept] + (str "\"location\": \"" + (format "%sconcepts/%s/%s" + (t-config/format-public-root-url (:search (t-config/app-conn-info))) + (:concept-id concept) + (:revision-id concept)) + "\"")) + +(defn create-notification + "Create the notification when a subscription exists. Returns either a notification message or nil." + [concept] + (let [concept-edn (convert-concept-to-edn concept) + pgi-str (get-producer-granule-id-message-str concept-edn) + granule-ur-str (str "\"granule-ur\": \"" (get-in concept-edn [:metadata :GranuleUR]) "\"") + g-concept-id-str (str "\"concept-id\": \"" (:concept-id concept-edn) "\"") + location-str (get-location-message-str concept)] + (str "{" g-concept-id-str ", " granule-ur-str ", " (when pgi-str (str pgi-str ", ")) location-str "}"))) + +(defn create-message-attributes + "Create the notification message attributes so that the notifications can be + filtered to the correct subscribing endpoint." + [collection-concept-id mode] + {"collection-concept-id" collection-concept-id + "mode" mode}) + +(defn create-message-subject + "Creates the message subject." + [mode] + (str mode " Notification")) + +(defn get-attributes-and-subject + "Determine based on the passed in concept if the granule is new, is an update + or a delete. Use the passed in mode to determine if any subscription is interested + in a notification. If they are then return the message attributes and subject, otherwise + return nil." + [concept mode coll-concept-id] + (cond + ;; Mode = Delete. + (and (:deleted concept) + (some #(= "Delete" %) mode)) + {:attributes (create-message-attributes coll-concept-id "Delete") + :subject (create-message-subject "Delete")} + + ;; Mode = New + (and (not (:deleted concept)) + (= 1 (:revision-id concept)) + (some #(= "New" %) mode)) + {:attributes (create-message-attributes coll-concept-id "New") + :subject (create-message-subject "New")} + + ;; Mode = Update + (and (not (:deleted concept)) + (pos? (compare (:revision-id concept) 1)) + (some #(= "Update" %) mode)) + {:attributes (create-message-attributes coll-concept-id "Update") + :subject (create-message-subject "Update")})) + +(defn work-potential-notification + "Publish a notification to the topic if the passed in concept is a granule + and a subscription is interested in being informed." + [context concept] + (when (granule-concept? (:concept-type concept)) + (let [start (System/currentTimeMillis) + coll-concept-id (:parent-collection-id (:extra-fields concept)) + sub-cache-map (subscription-cache/get-value context coll-concept-id)] + (when sub-cache-map + ;; Check the mode to see if the granule notification needs to be pushed. + (let [topic (get-in context [:system :sns :internal]) + message (create-notification concept) + {:keys [attributes subject]} (get-attributes-and-subject concept sub-cache-map coll-concept-id)] + (when (and attributes subject) + (let [result (topic-protocol/publish topic message attributes subject) + duration (- (System/currentTimeMillis) start)] + (debug (format "Work potential subscription publish took %d ms." duration)) + result))))))) (comment (let [system (get-in user/system [:apps :metadata-db])] diff --git a/metadata-db-app/src/cmr/metadata_db/system.clj b/metadata-db-app/src/cmr/metadata_db/system.clj index 95ce915e15..33a99e74e1 100644 --- a/metadata-db-app/src/cmr/metadata_db/system.clj +++ b/metadata-db-app/src/cmr/metadata_db/system.clj @@ -10,19 +10,17 @@ [cmr.common.api.web-server :as web] [cmr.common.config :as cfg :refer [defconfig]] [cmr.common.jobs :as jobs] - [cmr.common.lifecycle :as lifecycle] - [cmr.common.log :as log :refer [debug info warn error]] + [cmr.common.log :as log] [cmr.common.nrepl :as nrepl] [cmr.common.system :as common-sys] [cmr.message-queue.config :as queue-config] + [cmr.message-queue.pub-sub :as pub-sub] [cmr.message-queue.queue.queue-broker :as queue-broker] [cmr.metadata-db.api.routes :as routes] [cmr.metadata-db.config :as config] [cmr.metadata-db.services.jobs :as mdb-jobs] [cmr.metadata-db.services.subscription-cache :as subscription-cache] [cmr.metadata-db.services.util :as mdb-util] - [cmr.oracle.config :as oracle-config] - [cmr.oracle.connection :as oracle] [cmr.transmit.config :as transmit-config])) ;; Design based on http://stuartsierra.com/2013/09/15/lifecycle-composition and related posts @@ -39,6 +37,7 @@ "App logging level" {:default "info"}) +#_{:clj-kondo/ignore [:unresolved-var]} (defn create-system "Returns a new instance of the whole application." ([] @@ -60,6 +59,8 @@ `system-holder [jvm-info/log-jvm-statistics-job (cache-info/create-log-cache-info-job "metadata-db")]) :queue-broker (queue-broker/create-queue-broker (config/queue-config)) + :sns {:internal (pub-sub/create-topic (queue-config/cmr-internal-subscriptions-topic-name)) + :external (pub-sub/create-topic (queue-config/cmr-subscriptions-topic-name))} :relative-root-url (transmit-config/metadata-db-relative-root-url)}] (transmit-config/system-with-connections sys [:access-control :echo-rest])))) diff --git a/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj b/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj index b02904c6ae..e398179ea8 100644 --- a/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj +++ b/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj @@ -1,14 +1,21 @@ (ns cmr.metadata-db.test.services.subscriptions-test (:require - [clojure.test :refer [deftest is testing use-fixtures]] + [cheshire.core :as json] + [clojure.test :refer [deftest is testing use-fixtures join-fixtures]] [cmr.common.hash-cache :as hash-cache] [cmr.common.util :refer [are3]] + [cmr.message-queue.config :as msg-config] + [cmr.message-queue.pub-sub :as pub-sub] + [cmr.message-queue.test.test-util :as sqs-test-util] + [cmr.message-queue.topic.topic-protocol :as topic-protocol] [cmr.metadata-db.config :as mdb-config] [cmr.metadata-db.services.subscription-cache :as subscription-cache] [cmr.metadata-db.services.subscriptions :as subscriptions] - [cmr.redis-utils.test.test-util :as test-util])) + [cmr.redis-utils.test.test-util :as redis-test-util] + [cmr.message-queue.queue.aws-queue :as queue])) -(use-fixtures :once test-util/embedded-redis-server-fixture) +(use-fixtures :once (join-fixtures [redis-test-util/embedded-redis-server-fixture + sqs-test-util/embedded-sqs-server-fixture])) (defn create-value-set "Take a map result set and turn the vector values into set values." @@ -30,27 +37,45 @@ (let [value (mdb-config/ingest-subscription-enabled)] (is (= value subscriptions/subscriptions-enabled?)))) (testing "Testing if a passed in concept is a subscription concept" - (is (subscriptions/subscription-concept? :subscription {:extra-fields {:endpoint "some-endpoint" :method "ingest"}}))) + (is (subscriptions/ingest-subscription-concept? {:concept-type :subscription + :deleted false + :metadata {:CollectionConceptId "C12345-PROV1" + :EndPoint "some-endpoint" + :Method "ingest"} + :extra-fields {:collection-concept-id "C12345-PROV1"}}))) (testing "Testing if a passed in concept is a granule concept" (is (subscriptions/granule-concept? :granule))) - (testing "Add a subscription" - (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] '({:extra-fields {:subscription-type "granule" - :endpoint "some-endpoint" - :method "ingest" - :mode ["New"] - :collection-concept-id "C12345-PROV1"} - :concept-type :subscription}))} - (is (= 1 (subscriptions/change-subscription test-context :subscription {:extra-fields {:subscription-type "granule" - :endpoint "some-endpoint" - :method "ingest" - :mode ["New"] - :collection-concept-id "C12345-PROV1"}}))))) - (testing "Delete a subscription" - (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] '())} - (is (= 1 (subscriptions/change-subscription test-context :subscription {:extra-fields {:collection-concept-id "C12345-PROV1" - :mode ["New"] - :endpoint "some-endpoint" - :method "ingest"}}))))) + (testing "Add a subscription to the cache" + (with-bindings {#'subscriptions/get-subscriptions-from-db + (fn [_context _coll-concept-id] '({:concept-type :subscription + :deleted false + :metadata "{\"CollectionConceptId\":\"C12345-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\"], + \"Method\":\"ingest\"}", + :extra-fields {:collection-concept-id "C12345-PROV1"}}))} + (is (= 1 (subscriptions/change-subscription test-context {:concept-type :subscription + :deleted false + :metadata {:CollectionConceptId "C12345-PROV1" + :EndPoint "some-endpoint" + :Mode ["New"] + :Method "ingest"} + :extra-fields {:collection-concept-id "C12345-PROV1"}}))))) + (testing "Delete a subscription from the cache" + (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] '({:concept-type :subscription + :deleted true + :metadata "{\"CollectionConceptId\":\"C12345-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\"], + \"Method\":\"ingest\"}", + :extra-fields {:collection-concept-id "C12345-PROV1"}}))} + (is (= 1 (subscriptions/change-subscription test-context {:concept-type :subscription + :deleted true + :metadata {:CollectionConceptId "C12345-PROV1" + :EndPoint "some-endpoint" + :Mode ["New"] + :Method "ingest"} + :extra-fields {:collection-concept-id "C12345-PROV1"}}))))) (testing "Add-to-existing-mode" (are3 [expected existing-modes new-modes] @@ -89,129 +114,129 @@ "merge 1 mode." ["Update"] - '({:extra-fields {:mode ["Update"]}}) + '({:metadata {:Mode ["Update"]}}) "Merge several modes" ["New" "Update" "Delete"] - '({:extra-fields {:mode ["Update"]}} - {:extra-fields {:mode ["New"]}} - {:extra-fields {:mode ["Delete"]}}) + '({:metadata {:Mode ["Update"]}} + {:metadata {:Mode ["New"]}} + {:metadata {:Mode ["Delete"]}}) "Merge several modes 2" ["New" "Update" "Delete"] - '({:extra-fields {:mode ["Update"]}} - {:extra-fields {:mode ["New" "Delete"]}}) + '({:metadata {:Mode ["Update"]}} + {:metadata {:Mode ["New" "Delete"]}}) "Merge several modes 3 with duplicates." ["New" "Update" "Delete"] - '({:extra-fields {:mode ["Update"]}} - {:extra-fields {:mode ["New" "Delete"]}} - {:extra-fields {:mode ["New"]}} - {:extra-fields {:mode ["New" "Update"]}}))) + '({:metadata {:Mode ["Update"]}} + {:metadata {:Mode ["New" "Delete"]}} + {:metadata {:Mode ["New"]}} + {:metadata {:Mode ["New" "Update"]}}))) - (testing "adding and removing subscriptions." + (testing "adding and removing subscriptions from the cache." (are3 [expected example-record db-contents] (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-contents)} - (subscriptions/change-subscription test-context :subscription example-record) + (subscriptions/change-subscription test-context example-record) (is (= expected (create-value-set (hash-cache/get-map cache-client cache-key))))) "Adding 1 subscription" {"C12345-PROV1" (set ["Update"])} - {:extra-fields {:collection-concept-id "C12345-PROV1" - :mode ["Update"] - :endpoint "some-endpoint" - :method "ingest"}} - '({:extra-fields {:subscription-type "granule" - :endpoint "some-endpoint" - :method "ingest" - :mode ["Update"] - :collection-concept-id "C12345-PROV1"} + {:metadata {:CollectionConceptId "C12345-PROV1"}} + '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"Update\"], + \"Method\":\"ingest\"}", + :extra-fields {:collection-concept-id "C12345-PROV1"} + :deleted false :concept-type :subscription}) "Adding duplicate subscription" {"C12345-PROV1" (set ["Update"])} - {:extra-fields {:collection-concept-id "C12345-PROV1" - :mode ["Update"] - :endpoint "some-endpoint" - :method "ingest"}} - '({:extra-fields {:subscription-type "granule" - :endpoint "some-endpoint" - :method "ingest" - :mode ["Update"] - :collection-concept-id "C12345-PROV1"} + {:metadata {:CollectionConceptId "C12345-PROV1"}} + '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"Update\"], + \"Method\":\"ingest\"}", + :extra-fields {:collection-concept-id "C12345-PROV1"} + :deleted false :concept-type :subscription}) "Adding override subscription" {"C12345-PROV1" (set ["New" "Delete"])} - {:extra-fields {:collection-concept-id "C12345-PROV1" - :mode ["New" "Delete"] - :endpoint "some-endpoint" - :method "ingest"}} - '({:extra-fields {:subscription-type "granule" - :endpoint "some-endpoint" - :method "ingest" - :mode ["New" "Delete"] - :collection-concept-id "C12345-PROV1"} + {:metadata {:CollectionConceptId "C12345-PROV1"}} + '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\"}" + :extra-fields {:collection-concept-id "C12345-PROV1"} + :deleted false :concept-type :subscription}) "Adding new subscription that matches the one from before." {"C12345-PROV1" (set ["New" "Delete"]) "C12346-PROV1" (set ["New" "Delete"])} - {:extra-fields {:collection-concept-id "C12346-PROV1" - :mode ["New" "Delete"] - :endpoint "some-endpoint" - :method "ingest"}} - '({:extra-fields {:subscription-type "granule" - :endpoint "some-endpoint" - :method "ingest" - :mode ["New" "Delete"] - :collection-concept-id "C12345-PROV1"} - :concept-type :subscription} - {:extra-fields {:subscription-type "granule" - :collection-concept-id "C12346-PROV1" - :mode ["New" "Delete"] - :endpoint "some-endpoint" - :method "ingest"} + {:metadata {:CollectionConceptId "C12346-PROV1"}} + '({:metadata "{\"CollectionConceptId\":\"C12346-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\"}", + :extra-fields {:collection-concept-id "C12346-PROV1"} + :deleted false :concept-type :subscription}) "Removing 1 subscription" {"C12346-PROV1" (set ["New" "Delete"])} - {:extra-fields {:collection-concept-id "C12345-PROV1" - :mode ["New" "Delete"] - :endpoint "some-endpoint" - :method "ingest"}} + {:metadata {:CollectionConceptId "C12345-PROV1"}} ;; even though C12346-PROV1 is in the db, we are search only for ;; concepts with the collection-concept-id. - '() + '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\"}" + :extra-fields {:collection-concept-id "C12345-PROV1"} + :deleted true + :concept-type :subscription}) "Removing same subscription" {"C12346-PROV1" (set ["New" "Delete"])} - {:extra-fields {:collection-concept-id "C12345-PROV1" - :mode ["New" "Update"] - :endpoint "some-endpoint" - :method "ingest"}} - '() + {:metadata {:CollectionConceptId "C12345-PROV1"}} + '({:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\"}" + :extra-fields {:collection-concept-id "C12345-PROV1"} + :deleted true + :concept-type :subscription}) "Removing last subscription" nil - {:extra-fields {:collection-concept-id "C12346-PROV1" - :mode ["New" "Delete"] - :endpoint "some-endpoint" - :method "ingest"}} - '() + {:metadata {:CollectionConceptId "C12346-PROV1"}} + '({:metadata "{\"CollectionConceptId\":\"C12346-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\"}", + :extra-fields {:collection-concept-id "C12346-PROV1"} + :deleted true + :concept-type :subscription}) "Try to remove something that doesn't exist" nil - {:extra-fields {:collection-concept-id "C12345-PROV1" - :mode ["Update"] - :endpoint "some-endpoint" - :method "ingest"}} - '())))) + {:metadata {:CollectionConceptId "C12345-PROV1"}} + '())) + + (testing "adding and deleting subscriptions from the cache calling add-delete-subscription" + (let [db-contents '()] + (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-contents)} + (is (= {:metadata {:CollectionConceptId "C12345-PROV1"} + :concept-type :subscription} + (subscriptions/add-delete-subscription test-context {:metadata "{\"CollectionConceptId\":\"C12345-PROV1\"}" + :concept-type :subscription})))))))) (def db-result-1 '({:revision-id 1 - :deleted "false" + :deleted false :format "application/vnd.nasa.cmr.umm+json;version=1.1.1" :provider-id "PROV1" :user-id "ECHO_SYS" @@ -219,29 +244,26 @@ :native-id "erichs_ingest_subscription" :concept-id "SUB1200000005-PROV1" :metadata "{\"SubscriberId\":\"eereiter\", - \"CollectionConceptId\":\"C1200000002-PROV1\", - \"EndPoint\":\"some-endpoint\", - \"Mode\":[\"New\",\"Delete\"], - \"Method\":\"ingest\", - \"EmailAddress\":\"erich.e.reiter@nasa.gov\", - \"Query\":\"collection-concept-id=C1200000002-PROV1\", - \"Name\":\"Ingest-Subscription-Test\", - \"Type\":\"granule\", - \"MetadataSpecification\":{\"URL\":\"https://cdn.earthdata.nasa.gov/umm/subscription/v1.1.1\",\"Name\":\"UMM-Sub\",\"Version\":\"1.1.1\"}}" + \"CollectionConceptId\":\"C1200000002-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\",\"Delete\"], + \"Method\":\"ingest\", + \"EmailAddress\":\"erich.e.reiter@nasa.gov\", + \"Query\":\"collection-concept-id=C1200000002-PROV1\", + \"Name\":\"Ingest-Subscription-Test\", + \"Type\":\"granule\", + \"MetadataSpecification\":{\"URL\":\"https://cdn.earthdata.nasa.gov/umm/subscription/v1.1.1\",\"Name\":\"UMM-Sub\",\"Version\":\"1.1.1\"}}" :revision-date "2024-10-07T18:13:32.608Z" :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" :subscription-type "granule" :subscription-name "Ingest-Subscription-Test" :subscriber-id "eereiter" - :collection-concept-id "C1200000002-PROV1" - :endpoint "some-endpoint" - :mode ["New", "Delete"] - :method "ingest"} + :collection-concept-id "C1200000002-PROV1"} :concept-type :subscription})) (def db-result-2 '({:revision-id 1 - :deleted "false" + :deleted false :format "application/vnd.nasa.cmr.umm+json;version=1.1.1" :provider-id "PROV1" :user-id "ECHO_SYS" @@ -263,9 +285,6 @@ :subscription-type "granule" :subscription-name "Ingest-Subscription-Test" :subscriber-id "eereiter" - :endpoint "some-endpoint" - :mode ["New", "Update"] - :method "ingest" :collection-concept-id "C12346-PROV1"} :concept-type :subscription})) @@ -273,7 +292,7 @@ (concat db-result-1 db-result-2 '({:revision-id 1 - :deleted "false" + :deleted false :format "application/vnd.nasa.cmr.umm+json;version=1.1.1" :provider-id "PROV1" :user-id "ECHO_SYS" @@ -281,15 +300,15 @@ :native-id "erichs_ingest_subscription3" :concept-id "SUB1200000008-PROV1" :metadata "{\"SubscriberId\":\"eereiter\", - \"CollectionConceptId\":\"C1200000002-PROV1\", - \"EndPoint\":\"some-endpoint\", - \"Mode\":[\"Update\"], - \"Method\":\"ingest\", - \"EmailAddress\":\"erich.e.reiter@nasa.gov\", - \"Query\":\"collection-concept-id=C1200000002-PROV1\", - \"Name\":\"Ingest-Subscription-Test\", - \"Type\":\"granule\", - \"MetadataSpecification\":{\"URL\":\"https://cdn.earthdata.nasa.gov/umm/subscription/v1.1.1\",\"Name\":\"UMM-Sub\",\"Version\":\"1.1.1\"}}" + \"CollectionConceptId\":\"C1200000002-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"Update\"], + \"Method\":\"ingest\", + \"EmailAddress\":\"erich.e.reiter@nasa.gov\", + \"Query\":\"collection-concept-id=C1200000002-PROV1\", + \"Name\":\"Ingest-Subscription-Test\", + \"Type\":\"granule\", + \"MetadataSpecification\":{\"URL\":\"https://cdn.earthdata.nasa.gov/umm/subscription/v1.1.1\",\"Name\":\"UMM-Sub\",\"Version\":\"1.1.1\"}}" :revision-date "2024-10-07T18:13:32.608Z" :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" :subscription-type "granule" @@ -300,7 +319,7 @@ (def db-result-4 '({:revision-id 1 - :deleted "false" + :deleted false :format "application/vnd.nasa.cmr.umm+json;version=1.1.1" :provider-id "PROV1" :user-id "ECHO_SYS" @@ -308,24 +327,21 @@ :native-id "erichs_ingest_subscription9" :concept-id "SUB1200000009-PROV1" :metadata "{\"SubscriberId\":\"eereiter\", - \"CollectionConceptId\":\"C1200000003-PROV1\", - \"EndPoint\":\"some-endpoint\", - \"Mode\":[\"New\",\"Delete\"], - \"Method\":\"ingest\", - \"EmailAddress\":\"erich.e.reiter@nasa.gov\", - \"Query\":\"collection-concept-id=C1200000003-PROV1\", - \"Name\":\"Ingest-Subscription-Test\", - \"Type\":\"granule\", - \"MetadataSpecification\":{\"URL\":\"https://cdn.earthdata.nasa.gov/umm/subscription/v1.1.1\",\"Name\":\"UMM-Sub\",\"Version\":\"1.1.1\"}}" + \"CollectionConceptId\":\"C1200000003-PROV1\", + \"EndPoint\":\"some-endpoint\", + \"Mode\":[\"New\",\"Delete\"], + \"Method\":\"ingest\", + \"EmailAddress\":\"erich.e.reiter@nasa.gov\", + \"Query\":\"collection-concept-id=C1200000003-PROV1\", + \"Name\":\"Ingest-Subscription-Test\", + \"Type\":\"granule\", + \"MetadataSpecification\":{\"URL\":\"https://cdn.earthdata.nasa.gov/umm/subscription/v1.1.1\",\"Name\":\"UMM-Sub\",\"Version\":\"1.1.1\"}}" :revision-date "2024-10-07T18:13:32.608Z" :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" :subscription-type "granule" :subscription-name "Ingest-Subscription-Test" :subscriber-id "eereiter" - :collection-concept-id "C1200000003-PROV1" - :endpoint "some-endpoint" - :mode ["New", "Delete"] - :method "ingest"} + :collection-concept-id "C1200000003-PROV1"} :concept-type :subscription})) (deftest subscription-refresh-cache-test @@ -334,20 +350,11 @@ cache-client (get-in test-context [:system :caches cache-key])] (hash-cache/reset cache-client cache-key) (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-result-1)} - (subscriptions/change-subscription test-context :subscription {:extra-fields {:collection-concept-id "C1200000002-PROV1" - :mode ["New" "Delete"] - :endpoint "some-endpoint" - :method "ingest"}})) + (subscriptions/change-subscription test-context {:metadata {:CollectionConceptId "C1200000002-PROV1"}})) (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-result-2)} - (subscriptions/change-subscription test-context :subscription {:extra-fields {:collection-concept-id "C12346-PROV1" - :mode ["New" "Update"] - :endpoint "some-endpoint" - :method "ingest"}})) + (subscriptions/change-subscription test-context {:metadata {:CollectionConceptId "C12346-PROV1"}})) (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-result-4)} - (subscriptions/change-subscription test-context :subscription {:extra-fields {:collection-concept-id "C1200000003-PROV1" - :mode ["New", "Delete"] - :endpoint "some-endpoint" - :method "ingest"}})) + (subscriptions/change-subscription test-context {:metadata {:CollectionConceptId "C1200000003-PROV1"}})) (testing "What is in the cache" (is (= {"C1200000002-PROV1" (set ["New" "Delete"]) "C12346-PROV1" (set ["New" "Update"]) @@ -363,3 +370,305 @@ (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context] '())} (subscriptions/refresh-subscription-cache test-context)) (is (nil? (hash-cache/get-map cache-client cache-key)))))) + +(deftest get-producer-granule-id-message-str-test + (testing "Getting producer granule id for the subscription notification message" + (let [concept-edn {:metadata {:DataGranule {:Identifiers [{:IdentifierType "ProducerGranuleId" + :Identifier "Algorithm-1"}]}}}] + (is (= "\"producer-granule-id\": \"Algorithm-1\"" + (subscriptions/get-producer-granule-id-message-str concept-edn))))) + (testing "Getting producer granule id that doesn't exist. " + (let [concept-edn {:metadata {:DataGranule {:Identifiers [{:IdentifierType "SomeOtherID" + :Identifier "Algorithm-1"}]}}}] + (is (= nil (subscriptions/get-producer-granule-id-message-str concept-edn)))))) + +(deftest get-location-message-str-test + (testing "Getting location url for the concept id." + (let [concept {:concept-id "G12345-PROV1" + :revision-id 1}] + (is (= "\"location\": \"http://localhost:3003/concepts/G12345-PROV1/1\"" + (subscriptions/get-location-message-str concept)))))) + +(deftest create-notification-test + (testing "Getting the notification for a concept." + (let [concept {:concept-id "G12345-PROV1" + :revision-id 1 + :metadata "{\"GranuleUR\": \"GranuleUR\", + \"DataGranule\": {\"Identifiers\": [{\"IdentifierType\": \"ProducerGranuleId\", + \"Identifier\": \"Algorithm-1\"}]}}"}] + (is (= (str "{\"concept-id\": \"G12345-PROV1\", " + "\"granule-ur\": \"GranuleUR\", " + "\"producer-granule-id\": \"Algorithm-1\", " + "\"location\": \"http://localhost:3003/concepts/G12345-PROV1/1\"}") + (subscriptions/create-notification concept)))))) + +(deftest create-message-attributes-test + (testing "Creating the message attributes." + (let [collection-concept-id "C12345-PROV1" + mode "New"] + (is {"collection-concept-id" "C12345-PROV1" + "mode" "New"} + (subscriptions/create-message-attributes collection-concept-id mode))))) + +(deftest create-message-subject-test + (testing "Creating the message subject." + (let [mode "Delete"] + (is (= "Delete Notification" + (subscriptions/create-message-subject mode)))))) + +(deftest get-attributes-and-subject-test + (testing "Getting notificaiton attributes and subject." + (are3 + ;;concept mode coll-concept-id + [expected concept mode coll-concept-id] + (is (= expected + (subscriptions/get-attributes-and-subject concept mode coll-concept-id))) + + "Deleted concept" + {:attributes {"collection-concept-id" "C12345-PROV1" + "mode" "Delete"} + :subject "Delete Notification"} + {:deleted true} + ["New" "Delete"] + "C12345-PROV1" + + "Deleted concept, but not looking for the mode." + nil + {:deleted true} + ["New" "Update"] + "C12345-PROV1" + + "Deleted concept, but not looking for the mode, making sure no other condition is met." + nil + {:deleted true + :revision-id 2} + ["New" "Update"] + "C12345-PROV1" + + "New concept." + {:attributes {"collection-concept-id" "C12345-PROV1" + "mode" "New"} + :subject "New Notification"} + {:deleted false + :revision-id 1} + ["New" "Update"] + "C12345-PROV1" + + "Update concept." + {:attributes {"collection-concept-id" "C12345-PROV1" + "mode" "Update"} + :subject "Update Notification"} + {:deleted false + :revision-id 3} + ["New" "Update"] + "C12345-PROV1"))) + +(defn set-db-result + "Sets the mock db result with a real queue endpoint to + test real subscriptions and publishing of a message." + [queue-url] + (conj '() + {:revision-id 1 + :deleted false + :format "application/vnd.nasa.cmr.umm+json;version=1.1.1" + :provider-id "PROV1" + :user-id "ECHO_SYS" + :transaction-id "2000000009M" + :native-id "erichs_ingest_subscription" + :concept-id "SUB1200000005-PROV1" + :metadata (format + "{\"SubscriberId\":\"eereiter\", + \"CollectionConceptId\":\"C1200000002-PROV1\", + \"EndPoint\":\"%s\", + \"Mode\":[\"New\",\"Delete\"], + \"Method\":\"ingest\", + \"EmailAddress\":\"erich.e.reiter@nasa.gov\", + \"Query\":\"collection-concept-id=C1200000002-PROV1\", + \"Name\":\"Ingest-Subscription-Test\", + \"Type\":\"granule\", + \"MetadataSpecification\":{\"URL\":\"https://cdn.earthdata.nasa.gov/umm/subscription/v1.1.1\",\"Name\":\"UMM-Sub\",\"Version\":\"1.1.1\"}}" + queue-url) + :revision-date "2024-10-07T18:13:32.608Z" + :extra-fields {:normalized-query "76c6d7a828ef81efb3720638f335f65c" + :subscription-type "granule" + :subscription-name "Ingest-Subscription-Test" + :subscriber-id "eereiter" + :collection-concept-id "C1200000002-PROV1"} + :concept-type :subscription})) + +(defn get-cmr-internal-subscription-queue-url + "helper function for the work-potential-notitification-test + to get the internal subscription queue url to receive messages + from it." + [test-context] + (let [topic (get-in test-context [:system :sns :internal]) + internal-subscriptions @(:subscription-atom topic) + internal-sub (first (filter #(nil? (:concept-id %)) internal-subscriptions))] + (:queue-url internal-sub))) + +(defn get-cmr-subscription-dead-letter-queue-url + "helper function for the work-potential-notitification-test + to get the internal subscription queue url to receive messages + from it." + [test-context sub-concept-id] + (let [topic (get-in test-context [:system :sns :internal]) + internal-subscriptions @(:subscription-atom topic) + internal-sub (first (filter #(= sub-concept-id (:concept-id %)) internal-subscriptions))] + (:dead-letter-queue-url internal-sub))) + +(defn check-messages-and-contents + "This function checks to see if a message was received from a queue + and if it was, it checks the contents for the work-potiential-notification-test. + If a message was not received a test failure is produced." + [messages sqs-client queue-url] + (let [message (first messages)] + ;; check to see if a message exists + (is (some? message)) + ;; don't check the message contents if the message doesn't exist because an exception is thrown. + (when (some? message) + (let [message-str (.body message) + message (json/decode message-str true)] + (is (= "G12345-PROV1" (:concept-id message))) + (is (= '(:concept-id :granule-ur :producer-granule-id :location) (keys message))) + (is (some? (queue/delete-messages sqs-client queue-url messages))))))) + +(deftest work-potential-notification-test + (let [cache-key subscription-cache/subscription-cache-key + test-context {:system {:caches {cache-key (subscription-cache/create-cache-client)} + :sns {:internal (pub-sub/create-topic nil) + :external (pub-sub/create-topic nil)}}} + sqs-client (queue/create-sqs-client (msg-config/sqs-server-url)) + queue-name "cmr-subscription-client-test-queue" + queue-url (queue/create-queue sqs-client queue-name) + db-result (set-db-result queue-url) + concept-metadata (format "{\"CollectionConceptId\": \"C1200000002-PROV1\", + \"EndPoint\": \"%s\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\"}" + queue-url)] + + (testing "Concept not a granule" + (is (nil? (subscriptions/work-potential-notification test-context {:concept-type :collection})))) + (testing "Concept is a granule, but not in ingest subscription cache." + (is (nil? (subscriptions/work-potential-notification test-context {:concept-type :granule + :extra-fields {:parent-collection-id "C12349-PROV1"}})))) + (testing "Concept will get published." + (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-result)} + (let [sub-concept {:metadata concept-metadata + :concept-type :subscription + :concept-id "SUB1200000005-PROV1"} + granule-concept {:concept-type :granule + :deleted false + :revision-id 1 + :concept-id "G12345-PROV1" + :metadata "{\"GranuleUR\": \"GranuleUR\", + \"DataGranule\": {\"Identifiers\": [{\"IdentifierType\": \"ProducerGranuleId\", + \"Identifier\": \"Algorithm-1\"}]}}" + :extra-fields {:parent-collection-id "C1200000002-PROV1"}}] + + ;; if successful, the subscription concept-id is returned for local topic. + (is (= (:concept-id sub-concept) (subscriptions/add-subscription test-context sub-concept))) + + ;; the subscription is replaced when the subscription already exists. + (is (= (:concept-id sub-concept) (subscriptions/add-subscription test-context sub-concept))) + + ;; For this test add the subscription to the internal topic to test publishing. + (let [topic (get-in test-context [:system :sns :internal]) + sub-concept-edn (subscriptions/add-delete-subscription test-context sub-concept)] + (topic-protocol/subscribe topic sub-concept-edn)) + + ;; publish message. this should publish to 2 queues, the normal internal queue and to + ;; the client-test-queue. + (is (some? (subscriptions/work-potential-notification test-context granule-concept))) + + ;; Get message from subscribed queue. + (check-messages-and-contents (queue/receive-messages sqs-client queue-url) sqs-client queue-url) + + ;; Get message from infrastructure internal queue. + (let [internal-queue-url (get-cmr-internal-subscription-queue-url test-context)] + (check-messages-and-contents (queue/receive-messages sqs-client internal-queue-url) sqs-client internal-queue-url)) + + ;; Test sending to dead letter queue. + (is (some? (queue/delete-queue sqs-client queue-url))) + (subscriptions/work-potential-notification test-context granule-concept) + + ;; Receive message from dead letter queue. + (let [dead-letter-queue-url (get-cmr-subscription-dead-letter-queue-url test-context (sub-concept :concept-id))] + (check-messages-and-contents (queue/receive-messages sqs-client dead-letter-queue-url) sqs-client dead-letter-queue-url)) + + ;; Just delete the message from the internal infrastrcture queue. + (let [internal-queue-url (get-cmr-internal-subscription-queue-url test-context) + messages (queue/receive-messages sqs-client internal-queue-url)] + (is (some? messages)) + (when messages + (is (some? (queue/delete-messages sqs-client internal-queue-url messages)))))))) + + (testing "Concept will be unsubscribed." + (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] (conj '() (-> (first (set-db-result queue-url)) + (assoc :deleted true))))} + (let [sub-concept {:metadata concept-metadata + :deleted true + :concept-type :subscription + :concept-id "SUB1200000005-PROV1"} + subscription-arn nil] + (is (= (:concept-id sub-concept) (subscriptions/delete-subscription test-context sub-concept subscription-arn))) + ;; Also remove subscription from internal queue. + (let [topic (get-in test-context [:system :sns :internal])] + (topic-protocol/unsubscribe topic sub-concept))))))) + + (defn work-potential-notification-with-real-aws + "This function exists to manually test out the same code as + tested above, but using real AWS instead of a mocked topic. This function is + not ment to be run in bamboo. Redis needs to be up and running as does elasticmq." + [] + (with-redefs [msg-config/queue-type (fn [] "aws") + msg-config/app-environment (fn [] "sit")] + (println "internal topic: " (msg-config/cmr-internal-subscriptions-topic-name)) + (let [cache-key subscription-cache/subscription-cache-key + test-context {:system {:caches {cache-key (subscription-cache/create-cache-client)} + ;; These topic names are hard coded because the redefs are not working + ;; when calling another namespace. + :sns {:internal (pub-sub/create-topic "cmr-internal-subscriptions-sit") + :external (pub-sub/create-topic "cmr-subscriptions-sit")}}} + sqs-client (queue/create-sqs-client) + queue-name "cmr-subscription-client-test-queue" + queue-url (queue/create-queue sqs-client queue-name) + queue-arn (queue/get-queue-arn sqs-client queue-url) + _ (println "queue-url:" queue-arn) + db-result (set-db-result queue-arn) + concept-metadata (format "{\"CollectionConceptId\": \"C1200000002-PROV1\", + \"EndPoint\": \"%s\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\"}" + queue-arn)] + (testing "Concept will get published." + (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-result)} + (let [sub-concept {:metadata concept-metadata + :concept-type :subscription + :concept-id "SUB1200000005-PROV1"} + granule-concept {:concept-type :granule + :deleted false + :revision-id 1 + :concept-id "G12345-PROV1" + :metadata "{\"GranuleUR\": \"GranuleUR\", + \"DataGranule\": {\"Identifiers\": [{\"IdentifierType\": \"ProducerGranuleId\", + \"Identifier\": \"Algorithm-1\"}]}}" + :extra-fields {:parent-collection-id "C1200000002-PROV1"}} + subscription-arn (subscriptions/add-subscription test-context sub-concept)] + (is (some? subscription-arn)) + (when subscription-arn + (is (some? (subscriptions/delete-subscription test-context sub-concept subscription-arn)))) + + ;; publish message. this should publish to the internal queue + (is (some? (subscriptions/work-potential-notification test-context granule-concept))) + + (let [internal-queue-url "https://sqs.us-east-1.amazonaws.com/832706493240/cmr-internal-subscriptions-queue-sit" + messages (queue/receive-messages sqs-client internal-queue-url) + message-str (.body (first messages)) + message (json/decode message-str true) + real-message (json/decode (:Message message) true)] + (println "message:" message) + (println ":Message of message" (:Message message)) + (is (= "G12345-PROV1" (:concept-id real-message))) + (is (= '(:concept-id :granule-ur :producer-granule-id :location) (keys real-message))) + (is (some? (queue/delete-messages sqs-client internal-queue-url messages)))))))))) diff --git a/search-app/project.clj b/search-app/project.clj index 2a49c64488..f5e2a195ec 100644 --- a/search-app/project.clj +++ b/search-app/project.clj @@ -10,6 +10,7 @@ [nasa-cmr/cmr-common-lib "0.1.1-SNAPSHOT"] [nasa-cmr/cmr-elastic-utils-lib "0.1.0-SNAPSHOT"] [nasa-cmr/cmr-metadata-db-app "0.1.0-SNAPSHOT"] + [nasa-cmr/cmr-message-queue-lib "0.1.0-SNAPSHOT"] [nasa-cmr/cmr-orbits-lib "0.1.0-SNAPSHOT"] [nasa-cmr/cmr-redis-utils-lib "0.1.0-SNAPSHOT"] [nasa-cmr/cmr-spatial-lib "0.1.0-SNAPSHOT"] diff --git a/system-int-test/test/cmr/system_int_test/ingest/subscription/subscription_processing_test.clj b/system-int-test/test/cmr/system_int_test/ingest/subscription/subscription_processing_test.clj index 09c5e157ee..ea11b8463d 100644 --- a/system-int-test/test/cmr/system_int_test/ingest/subscription/subscription_processing_test.clj +++ b/system-int-test/test/cmr/system_int_test/ingest/subscription/subscription_processing_test.clj @@ -2,7 +2,7 @@ "CMR subscription processing tests." (:require [clj-time.core :as t] - [clojure.test :refer :all] + [clojure.test :refer [deftest is join-fixtures testing use-fixtures]] [cmr.access-control.test.util :as ac-util] [cmr.ingest.services.subscriptions-helper :as jobs] [cmr.mock-echo.client.echo-util :as echo-util] @@ -52,7 +52,7 @@ integration tests. If send-subscription-emails is called in tests without send-email being mocked, errors will be returned when attempting to connect to the mail server in postal-core/send-message." - [email-settings email-content]) + [_email-settings _email-content]) (deftest ^:oracle subscription-job-manual-time-constraint-test "This test is used to validate that email-subscription-processing will use a @@ -161,9 +161,9 @@ (data-umm-c/collection {:ShortName "coll1" :EntryTitle "entry-title1"}) {:token "mock-echo-system-token"}) - gran1 (create-granule-and-index "PROV1" coll1 "Granule1") + _ (create-granule-and-index "PROV1" coll1 "Granule1") ;; Setup subscriptions - sub1 (subscription-util/create-subscription-and-index coll1 "test_sub_prov1" "user2" "provider=PROV1")] + _ (subscription-util/create-subscription-and-index coll1 "test_sub_prov1" "user2" "provider=PROV1")] (testing "Using the manual endpoint does not update last-notified-at for subscriptions" (let [system-context (system/context) @@ -207,13 +207,13 @@ :EntryTitle "entry-title1"}) {:token "mock-echo-system-token"}) - coll2 (data-core/ingest-umm-spec-collection "PROV1" - (data-umm-c/collection {:ShortName "coll2" - :EntryTitle "entry-title2"}) - {:token "mock-echo-system-token"}) + _ (data-core/ingest-umm-spec-collection "PROV1" + (data-umm-c/collection {:ShortName "coll2" + :EntryTitle "entry-title2"}) + {:token "mock-echo-system-token"}) _ (index/wait-until-indexed) ;; Setup subscriptions - sub1 (subscription-util/create-subscription-and-index coll1 "test_sub_prov1" "user2" "provider=PROV1")] + _ (subscription-util/create-subscription-and-index coll1 "test_sub_prov1" "user2" "provider=PROV1")] (testing "First query executed does not have a last-notified-at and looks back 24 hours" (let [gran1 (create-granule-and-index "PROV1" coll1 "Granule1") @@ -224,8 +224,6 @@ (map :concept-id))] (is (= (:concept-id gran1) (first results))))) - (dev-system/advance-time! 10) - (testing "Second run finds only granules created since the last notification" (let [gran2 (create-granule-and-index "PROV1" coll1 "Granule2") response (->> (system/context) @@ -256,7 +254,7 @@ (testing "Tests subscriber-id filtering in subscription email processing job" (let [user1-group-id (echo-util/get-or-create-group (system/context) "group1") ;; User 1 is in group1 - user1-token (echo-util/login (system/context) "user1" [user1-group-id]) + _ (echo-util/login (system/context) "user1" [user1-group-id]) _ (echo-util/ungrant (system/context) (-> (access-control/search-for-acls (system/context) {:provider "PROV1" @@ -317,14 +315,14 @@ (data-umm-c/collection {:ShortName "coll2" :EntryTitle "entry-title2"}) {:token "mock-echo-system-token"}) - coll3 (data-core/ingest-umm-spec-collection "PROV1" + _ (data-core/ingest-umm-spec-collection "PROV1" (data-umm-c/collection {:ShortName "coll3" :EntryTitle "entry-title3" :AccessConstraints (data-umm-c/access-constraints {:Value 51 :Description "Those files are for British eyes only."})}) {:token "mock-echo-system-token"}) - coll4 (data-core/ingest-umm-spec-collection "PROV1" + _ (data-core/ingest-umm-spec-collection "PROV1" (data-umm-c/collection {:ShortName "coll4" :EntryTitle "entry-title4" @@ -358,7 +356,7 @@ {:granule-ur "Granule1" :access-value 33}) {:token "mock-echo-system-token"}) - gran2 (data-core/ingest "PROV1" + _ (data-core/ingest "PROV1" (data-granule/granule-with-umm-spec-collection coll1 (:concept-id coll1) {:granule-ur "Granule2" diff --git a/transmit-lib/src/cmr/transmit/search.clj b/transmit-lib/src/cmr/transmit/search.clj index 1df234570f..4017159246 100644 --- a/transmit-lib/src/cmr/transmit/search.clj +++ b/transmit-lib/src/cmr/transmit/search.clj @@ -1,6 +1,7 @@ (ns cmr.transmit.search "Provide functions to invoke search app" (:require + [cheshire.core :as json] [clj-http.client :as client] [clojure.data.xml :as xml] [cmr.common.api.context :as ch] @@ -21,16 +22,18 @@ #_{:clj-kondo/ignore [:unresolved-symbol]} (defn-timed save-subscription-notification-time "make an http call to the database application" - [context sub-id] + [context sub-id last-notified-time] (let [conn (config/context->app-connection context :metadata-db) request-url (str (conn/root-url conn) (format "/subscription/%s/notification-time" sub-id)) + request-body {:last-notified-time last-notified-time} params (merge (config/conn-params conn) {:accept :xml :headers (merge (token-header context) {:client-id config/cmr-client-id}) - :throw-exceptions false}) + :throw-exceptions false + :body (json/generate-string request-body)}) response (client/put request-url params) {:keys [body]} response status (int (:status response))] @@ -106,7 +109,7 @@ (errors/internal-error! (format "Granule search failed. status: %s body: %s" status body))))) -(declare validate-search-params) +(declare validate-search-params context params concept-type) (defn-timed validate-search-params "Attempts to search granules using given params via a POST request. If the response contains a non-200 http code, returns the response body." diff --git a/transmit-lib/test/cmr/transmit/test/search.clj b/transmit-lib/test/cmr/transmit/test/search.clj index a030feb09a..d912ccf400 100644 --- a/transmit-lib/test/cmr/transmit/test/search.clj +++ b/transmit-lib/test/cmr/transmit/test/search.clj @@ -3,6 +3,7 @@ the functionality of the function called, but instead will check that the request includes a client id." (:require + [clj-time.core :as time] [clojure.test :refer [deftest is testing]] [clj-http.client :as client] [cmr.transmit.search :as search])) @@ -24,7 +25,7 @@ (format "Failed testing %s" (:url arg))) {:status 204 :body ""})] (with-redefs [client/request action-tester] - (let [result (search/save-subscription-notification-time context "sub-id-1")] + (let [result (search/save-subscription-notification-time context "sub-id-1" (str (time/now)))] (is (nil? result)))))) (testing "check for client id"